CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Pending
Overview
Eval results
Files

client-api.mddocs/

Client API

Prefect's client API provides programmatic access to the Prefect server and cloud services through HTTP clients. This enables interaction with flows, deployments, work pools, and orchestration services from external applications and scripts.

Capabilities

Client Factory

Get HTTP client instances for interacting with Prefect API services.

def get_client(
    httpx_settings: dict = None,
    sync_client: bool = None,
) -> Union[PrefectClient, SyncPrefectClient]:
    """
    Get a Prefect HTTP client for API interaction.
    
    Parameters:
    - httpx_settings: Custom HTTPX client configuration
    - sync_client: Whether to return a synchronous client (defaults to async)
    
    Returns:
    PrefectClient (async) or SyncPrefectClient (sync) instance
    
    The client type is determined by:
    1. sync_client parameter if provided
    2. Current context (async if in async function)
    3. Defaults to async client
    """

Usage Examples

from prefect.client.orchestration import get_client

# Get async client (default)
async def async_example():
    client = get_client()
    flows = await client.read_flows()
    return flows

# Get sync client explicitly
def sync_example():
    client = get_client(sync_client=True)
    flows = client.read_flows()
    return flows

# Client with custom HTTPX settings
client = get_client(httpx_settings={
    "timeout": 30.0,
    "limits": {"max_connections": 10}
})

Async Prefect Client

Asynchronous HTTP client for Prefect API operations with full async/await support.

class PrefectClient:
    """
    Asynchronous HTTP client for the Prefect API.
    
    Provides comprehensive access to Prefect server functionality including
    flows, deployments, flow runs, task runs, work pools, and orchestration.
    """
    
    def __init__(
        self,
        api: str = None,
        api_key: str = None,
        api_version: str = None,
        httpx_settings: dict = None,
    ):
        """
        Initialize the Prefect client.
        
        Parameters:
        - api: Prefect API URL (defaults to PREFECT_API_URL setting)
        - api_key: API key for authentication (defaults to PREFECT_API_KEY setting)
        - api_version: API version to use (defaults to current version)
        - httpx_settings: Custom HTTPX client configuration
        """
    
    async def create_flow_run(
        self,
        flow: Flow,
        name: str = None,
        parameters: Dict[str, Any] = None,
        context: Dict[str, Any] = None,
        tags: List[str] = None,
        parent_task_run_id: UUID = None,
        state: State = None,
    ) -> FlowRun:
        """
        Create a new flow run.
        
        Parameters:
        - flow: Flow object to create a run for
        - name: Custom name for the flow run
        - parameters: Parameters to pass to the flow
        - context: Context data for the run
        - tags: Tags to apply to the flow run
        - parent_task_run_id: Parent task run if this is a subflow
        - state: Initial state for the flow run
        
        Returns:
        Created FlowRun object
        """
    
    async def read_flows(
        self,
        limit: int = None,
        offset: int = None,
        sort: FlowSort = None,
    ) -> List[Flow]:
        """
        Read flows from the API.
        
        Parameters:
        - limit: Maximum number of flows to return
        - offset: Number of flows to skip
        - sort: Sorting configuration
        
        Returns:
        List of Flow objects
        """
    
    async def read_deployments(
        self,
        limit: int = None,
        offset: int = None,
        sort: DeploymentSort = None,
        flow_filter: FlowFilter = None,
        deployment_filter: DeploymentFilter = None,
    ) -> List[Deployment]:
        """
        Read deployments from the API.
        
        Parameters:
        - limit: Maximum number of deployments to return
        - offset: Number of deployments to skip
        - sort: Sorting configuration
        - flow_filter: Filter for associated flows
        - deployment_filter: Filter for deployments
        
        Returns:
        List of Deployment objects
        """
    
    async def create_deployment(
        self,
        flow_id: UUID,
        name: str,
        version: str = None,
        schedule: Union[CronSchedule, IntervalSchedule] = None,
        parameters: Dict[str, Any] = None,
        tags: List[str] = None,
        work_pool_name: str = None,
        work_queue_name: str = None,
        **kwargs
    ) -> Deployment:
        """
        Create a new deployment.
        
        Parameters:
        - flow_id: ID of the flow to deploy
        - name: Name for the deployment
        - version: Version string
        - schedule: Scheduling configuration
        - parameters: Default parameters
        - tags: Deployment tags
        - work_pool_name: Target work pool
        - work_queue_name: Target work queue
        
        Returns:
        Created Deployment object
        """
    
    async def read_work_pools(
        self,
        limit: int = None,
        offset: int = None,
        work_pool_filter: WorkPoolFilter = None,
    ) -> List[WorkPool]:
        """
        Read work pools from the API.
        
        Parameters:
        - limit: Maximum number of work pools to return
        - offset: Number of work pools to skip
        - work_pool_filter: Filter criteria
        
        Returns:
        List of WorkPool objects
        """
    
    async def create_work_pool(
        self,
        name: str,
        type: str,
        description: str = None,
        is_paused: bool = False,
        concurrency_limit: int = None,
        **kwargs
    ) -> WorkPool:
        """
        Create a new work pool.
        
        Parameters:
        - name: Work pool name
        - type: Work pool type (process, docker, kubernetes, etc.)
        - description: Work pool description
        - is_paused: Whether to start paused
        - concurrency_limit: Maximum concurrent workers
        
        Returns:
        Created WorkPool object
        """
    
    async def set_flow_run_state(
        self,
        flow_run_id: UUID,
        state: State,
        force: bool = False,
    ) -> OrchestrationResult:
        """
        Set the state of a flow run.
        
        Parameters:
        - flow_run_id: ID of the flow run
        - state: New state to set
        - force: Whether to force the state change
        
        Returns:
        OrchestrationResult with state change details
        """
    
    async def set_task_run_state(
        self,
        task_run_id: UUID,
        state: State,
        force: bool = False,
    ) -> OrchestrationResult:
        """
        Set the state of a task run.
        
        Parameters:
        - task_run_id: ID of the task run
        - state: New state to set
        - force: Whether to force the state change
        
        Returns:
        OrchestrationResult with state change details
        """

Usage Examples

from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.objects import Flow, Deployment
from prefect.states import Completed

async def client_operations():
    client = PrefectClient()
    
    # List flows
    flows = await client.read_flows(limit=10)
    for flow in flows:
        print(f"Flow: {flow.name}")
    
    # Create a flow run
    flow_run = await client.create_flow_run(
        flow=flows[0],
        parameters={"param1": "value1"},
        tags=["api-created"]
    )
    
    # List deployments
    deployments = await client.read_deployments()
    
    # Update flow run state
    result = await client.set_flow_run_state(
        flow_run.id,
        Completed(message="Completed via API")
    )
    
    return flow_run

Sync Prefect Client

Synchronous HTTP client providing the same functionality as the async client but with blocking operations.

class SyncPrefectClient:
    """
    Synchronous HTTP client for the Prefect API.
    
    Provides the same functionality as PrefectClient but with
    synchronous method calls that block until completion.
    """
    
    def __init__(
        self,
        api: str = None,
        api_key: str = None,
        api_version: str = None,
        httpx_settings: dict = None,
    ):
        """Initialize the synchronous Prefect client."""
    
    def create_flow_run(
        self,
        flow: Flow,
        name: str = None,
        parameters: Dict[str, Any] = None,
        context: Dict[str, Any] = None,
        tags: List[str] = None,
        parent_task_run_id: UUID = None,
        state: State = None,
    ) -> FlowRun:
        """Create a new flow run (synchronous version)."""
    
    def read_flows(
        self,
        limit: int = None,
        offset: int = None,
        sort: FlowSort = None,
    ) -> List[Flow]:
        """Read flows from the API (synchronous version)."""
    
    def read_deployments(
        self,
        limit: int = None,
        offset: int = None,
        sort: DeploymentSort = None,
        flow_filter: FlowFilter = None,
        deployment_filter: DeploymentFilter = None,
    ) -> List[Deployment]:
        """Read deployments from the API (synchronous version)."""
    
    def set_flow_run_state(
        self,
        flow_run_id: UUID,
        state: State,
        force: bool = False,
    ) -> OrchestrationResult:
        """Set flow run state (synchronous version)."""

Usage Examples

from prefect.client.orchestration import SyncPrefectClient
from prefect.states import Running

def sync_client_operations():
    client = SyncPrefectClient()
    
    # All operations are synchronous
    flows = client.read_flows(limit=5)
    
    if flows:
        flow_run = client.create_flow_run(
            flow=flows[0],
            parameters={"sync": True}
        )
        
        # Update state
        client.set_flow_run_state(
            flow_run.id,
            Running(message="Started via sync client")
        )
        
        return flow_run

Cloud Client

Specialized client for Prefect Cloud services with additional cloud-specific functionality.

def get_cloud_client(
    host: str = None,
    api_key: str = None,
    httpx_settings: dict = None,
    infer_cloud_url: bool = True,
) -> CloudClient:
    """
    Get a Prefect Cloud client.
    
    Parameters:
    - host: Prefect Cloud host URL
    - api_key: Cloud API key
    - httpx_settings: Custom HTTPX configuration
    - infer_cloud_url: Whether to automatically detect cloud URL
    
    Returns:
    CloudClient instance for Prefect Cloud operations
    """

class CloudClient:
    """
    Client for Prefect Cloud services.
    
    Extends PrefectClient with cloud-specific functionality like
    workspace management, user operations, and cloud settings.
    """
    
    async def read_workspaces(self) -> List[Workspace]:
        """Read available workspaces."""
    
    async def create_workspace(
        self,
        name: str,
        description: str = None,
    ) -> Workspace:
        """Create a new workspace."""
    
    async def read_current_user(self) -> User:
        """Read current user information."""
    
    async def read_workspace_settings(
        self,
        workspace_id: UUID,
    ) -> WorkspaceSettings:
        """Read workspace configuration settings."""

Error Handling

Exception classes for handling client API errors.

class PrefectHTTPStatusError(Exception):
    """HTTP status error from Prefect API."""
    
    def __init__(
        self,
        message: str,
        response: httpx.Response,
        request: httpx.Request = None,
    ):
        """Initialize HTTP status error."""
    
    @property
    def status_code(self) -> int:
        """HTTP status code."""

class ObjectNotFound(PrefectHTTPStatusError):
    """Raised when a requested object is not found."""

class ValidationError(Exception):
    """Raised when request data fails validation."""

class AuthenticationError(PrefectHTTPStatusError):
    """Raised when authentication fails."""

Usage Examples

from prefect.client.orchestration import get_client
from prefect.exceptions import ObjectNotFound, AuthenticationError

async def error_handling_example():
    client = get_client()
    
    try:
        # This might raise ObjectNotFound
        flow = await client.read_flow_by_name("non-existent-flow")
    except ObjectNotFound:
        print("Flow not found")
    except AuthenticationError:
        print("Authentication failed")
    except Exception as e:
        print(f"Unexpected error: {e}")

Filtering and Sorting

Query filters and sorting options for API operations.

class FlowFilter:
    """Filter criteria for flow queries."""
    
    def __init__(
        self,
        name: Union[str, List[str]] = None,
        tags: Union[str, List[str]] = None,
        id: Union[UUID, List[UUID]] = None,
    ):
        """Initialize flow filter."""

class DeploymentFilter:
    """Filter criteria for deployment queries."""
    
    def __init__(
        self,
        name: Union[str, List[str]] = None,
        tags: Union[str, List[str]] = None,
        work_pool_name: Union[str, List[str]] = None,
        is_schedule_active: bool = None,
    ):
        """Initialize deployment filter."""

class FlowSort:
    """Sorting configuration for flow queries."""
    
    def __init__(
        self,
        field: str,
        direction: str = "asc",
    ):
        """
        Initialize flow sorting.
        
        Parameters:
        - field: Field to sort by (name, created, updated)
        - direction: Sort direction (asc, desc)
        """

class DeploymentSort:
    """Sorting configuration for deployment queries."""
    
    def __init__(
        self,
        field: str,
        direction: str = "asc",
    ):
        """Initialize deployment sorting."""

Usage Examples

from prefect.client.schemas.filters import FlowFilter, DeploymentFilter
from prefect.client.schemas.sorting import FlowSort

async def filtering_example():
    client = get_client()
    
    # Filter flows by tags
    flow_filter = FlowFilter(tags=["production", "etl"])
    flows = await client.read_flows(
        flow_filter=flow_filter,
        sort=FlowSort(field="name", direction="asc")
    )
    
    # Filter deployments by work pool
    deployment_filter = DeploymentFilter(
        work_pool_name="kubernetes-pool",
        is_schedule_active=True
    )
    deployments = await client.read_deployments(
        deployment_filter=deployment_filter,
        limit=20
    )

Types

Types specific to client API operations:

from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from datetime import datetime
import httpx

class OrchestrationResult:
    """Result of an orchestration operation."""
    state: State
    status: SetStateStatus
    details: OrchestrationDetails

class SetStateStatus(str, Enum):
    """Status of a state change operation."""
    ACCEPT = "ACCEPT"
    REJECT = "REJECT"
    ABORT = "ABORT"
    WAIT = "WAIT"

class OrchestrationDetails:
    """Details about orchestration decisions."""
    flow_run_id: Optional[UUID]
    task_run_id: Optional[UUID]
    transition_id: Optional[UUID]

class Workspace:
    """Prefect Cloud workspace."""
    id: UUID
    name: str
    description: Optional[str]
    created: datetime
    updated: datetime

class User:
    """Prefect Cloud user."""
    id: UUID
    email: str
    name: str
    created: datetime

class WorkspaceSettings:
    """Workspace configuration settings."""
    workspace_id: UUID
    settings: Dict[str, Any]

# Filter and sort types
class FlowRunFilter:
    """Filter for flow run queries."""
    pass

class TaskRunFilter:
    """Filter for task run queries."""
    pass

class WorkPoolFilter:
    """Filter for work pool queries."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-prefect

docs

client-api.md

configuration.md

context-utilities.md

core-workflows.md

deployments.md

index.md

runtime-context.md

state-management.md

variables.md

tile.json