Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect's client API provides programmatic access to the Prefect server and cloud services through HTTP clients. This enables interaction with flows, deployments, work pools, and orchestration services from external applications and scripts.
Get HTTP client instances for interacting with Prefect API services.
def get_client(
httpx_settings: dict = None,
sync_client: bool = None,
) -> Union[PrefectClient, SyncPrefectClient]:
"""
Get a Prefect HTTP client for API interaction.
Parameters:
- httpx_settings: Custom HTTPX client configuration
- sync_client: Whether to return a synchronous client (defaults to async)
Returns:
PrefectClient (async) or SyncPrefectClient (sync) instance
The client type is determined by:
1. sync_client parameter if provided
2. Current context (async if in async function)
3. Defaults to async client
"""from prefect.client.orchestration import get_client
# Get async client (default)
async def async_example():
client = get_client()
flows = await client.read_flows()
return flows
# Get sync client explicitly
def sync_example():
client = get_client(sync_client=True)
flows = client.read_flows()
return flows
# Client with custom HTTPX settings
client = get_client(httpx_settings={
"timeout": 30.0,
"limits": {"max_connections": 10}
})Asynchronous HTTP client for Prefect API operations with full async/await support.
class PrefectClient:
"""
Asynchronous HTTP client for the Prefect API.
Provides comprehensive access to Prefect server functionality including
flows, deployments, flow runs, task runs, work pools, and orchestration.
"""
def __init__(
self,
api: str = None,
api_key: str = None,
api_version: str = None,
httpx_settings: dict = None,
):
"""
Initialize the Prefect client.
Parameters:
- api: Prefect API URL (defaults to PREFECT_API_URL setting)
- api_key: API key for authentication (defaults to PREFECT_API_KEY setting)
- api_version: API version to use (defaults to current version)
- httpx_settings: Custom HTTPX client configuration
"""
async def create_flow_run(
self,
flow: Flow,
name: str = None,
parameters: Dict[str, Any] = None,
context: Dict[str, Any] = None,
tags: List[str] = None,
parent_task_run_id: UUID = None,
state: State = None,
) -> FlowRun:
"""
Create a new flow run.
Parameters:
- flow: Flow object to create a run for
- name: Custom name for the flow run
- parameters: Parameters to pass to the flow
- context: Context data for the run
- tags: Tags to apply to the flow run
- parent_task_run_id: Parent task run if this is a subflow
- state: Initial state for the flow run
Returns:
Created FlowRun object
"""
async def read_flows(
self,
limit: int = None,
offset: int = None,
sort: FlowSort = None,
) -> List[Flow]:
"""
Read flows from the API.
Parameters:
- limit: Maximum number of flows to return
- offset: Number of flows to skip
- sort: Sorting configuration
Returns:
List of Flow objects
"""
async def read_deployments(
self,
limit: int = None,
offset: int = None,
sort: DeploymentSort = None,
flow_filter: FlowFilter = None,
deployment_filter: DeploymentFilter = None,
) -> List[Deployment]:
"""
Read deployments from the API.
Parameters:
- limit: Maximum number of deployments to return
- offset: Number of deployments to skip
- sort: Sorting configuration
- flow_filter: Filter for associated flows
- deployment_filter: Filter for deployments
Returns:
List of Deployment objects
"""
async def create_deployment(
self,
flow_id: UUID,
name: str,
version: str = None,
schedule: Union[CronSchedule, IntervalSchedule] = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
work_pool_name: str = None,
work_queue_name: str = None,
**kwargs
) -> Deployment:
"""
Create a new deployment.
Parameters:
- flow_id: ID of the flow to deploy
- name: Name for the deployment
- version: Version string
- schedule: Scheduling configuration
- parameters: Default parameters
- tags: Deployment tags
- work_pool_name: Target work pool
- work_queue_name: Target work queue
Returns:
Created Deployment object
"""
async def read_work_pools(
self,
limit: int = None,
offset: int = None,
work_pool_filter: WorkPoolFilter = None,
) -> List[WorkPool]:
"""
Read work pools from the API.
Parameters:
- limit: Maximum number of work pools to return
- offset: Number of work pools to skip
- work_pool_filter: Filter criteria
Returns:
List of WorkPool objects
"""
async def create_work_pool(
self,
name: str,
type: str,
description: str = None,
is_paused: bool = False,
concurrency_limit: int = None,
**kwargs
) -> WorkPool:
"""
Create a new work pool.
Parameters:
- name: Work pool name
- type: Work pool type (process, docker, kubernetes, etc.)
- description: Work pool description
- is_paused: Whether to start paused
- concurrency_limit: Maximum concurrent workers
Returns:
Created WorkPool object
"""
async def set_flow_run_state(
self,
flow_run_id: UUID,
state: State,
force: bool = False,
) -> OrchestrationResult:
"""
Set the state of a flow run.
Parameters:
- flow_run_id: ID of the flow run
- state: New state to set
- force: Whether to force the state change
Returns:
OrchestrationResult with state change details
"""
async def set_task_run_state(
self,
task_run_id: UUID,
state: State,
force: bool = False,
) -> OrchestrationResult:
"""
Set the state of a task run.
Parameters:
- task_run_id: ID of the task run
- state: New state to set
- force: Whether to force the state change
Returns:
OrchestrationResult with state change details
"""from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.objects import Flow, Deployment
from prefect.states import Completed
async def client_operations():
client = PrefectClient()
# List flows
flows = await client.read_flows(limit=10)
for flow in flows:
print(f"Flow: {flow.name}")
# Create a flow run
flow_run = await client.create_flow_run(
flow=flows[0],
parameters={"param1": "value1"},
tags=["api-created"]
)
# List deployments
deployments = await client.read_deployments()
# Update flow run state
result = await client.set_flow_run_state(
flow_run.id,
Completed(message="Completed via API")
)
return flow_runSynchronous HTTP client providing the same functionality as the async client but with blocking operations.
class SyncPrefectClient:
"""
Synchronous HTTP client for the Prefect API.
Provides the same functionality as PrefectClient but with
synchronous method calls that block until completion.
"""
def __init__(
self,
api: str = None,
api_key: str = None,
api_version: str = None,
httpx_settings: dict = None,
):
"""Initialize the synchronous Prefect client."""
def create_flow_run(
self,
flow: Flow,
name: str = None,
parameters: Dict[str, Any] = None,
context: Dict[str, Any] = None,
tags: List[str] = None,
parent_task_run_id: UUID = None,
state: State = None,
) -> FlowRun:
"""Create a new flow run (synchronous version)."""
def read_flows(
self,
limit: int = None,
offset: int = None,
sort: FlowSort = None,
) -> List[Flow]:
"""Read flows from the API (synchronous version)."""
def read_deployments(
self,
limit: int = None,
offset: int = None,
sort: DeploymentSort = None,
flow_filter: FlowFilter = None,
deployment_filter: DeploymentFilter = None,
) -> List[Deployment]:
"""Read deployments from the API (synchronous version)."""
def set_flow_run_state(
self,
flow_run_id: UUID,
state: State,
force: bool = False,
) -> OrchestrationResult:
"""Set flow run state (synchronous version)."""from prefect.client.orchestration import SyncPrefectClient
from prefect.states import Running
def sync_client_operations():
client = SyncPrefectClient()
# All operations are synchronous
flows = client.read_flows(limit=5)
if flows:
flow_run = client.create_flow_run(
flow=flows[0],
parameters={"sync": True}
)
# Update state
client.set_flow_run_state(
flow_run.id,
Running(message="Started via sync client")
)
return flow_runSpecialized client for Prefect Cloud services with additional cloud-specific functionality.
def get_cloud_client(
host: str = None,
api_key: str = None,
httpx_settings: dict = None,
infer_cloud_url: bool = True,
) -> CloudClient:
"""
Get a Prefect Cloud client.
Parameters:
- host: Prefect Cloud host URL
- api_key: Cloud API key
- httpx_settings: Custom HTTPX configuration
- infer_cloud_url: Whether to automatically detect cloud URL
Returns:
CloudClient instance for Prefect Cloud operations
"""
class CloudClient:
"""
Client for Prefect Cloud services.
Extends PrefectClient with cloud-specific functionality like
workspace management, user operations, and cloud settings.
"""
async def read_workspaces(self) -> List[Workspace]:
"""Read available workspaces."""
async def create_workspace(
self,
name: str,
description: str = None,
) -> Workspace:
"""Create a new workspace."""
async def read_current_user(self) -> User:
"""Read current user information."""
async def read_workspace_settings(
self,
workspace_id: UUID,
) -> WorkspaceSettings:
"""Read workspace configuration settings."""Exception classes for handling client API errors.
class PrefectHTTPStatusError(Exception):
"""HTTP status error from Prefect API."""
def __init__(
self,
message: str,
response: httpx.Response,
request: httpx.Request = None,
):
"""Initialize HTTP status error."""
@property
def status_code(self) -> int:
"""HTTP status code."""
class ObjectNotFound(PrefectHTTPStatusError):
"""Raised when a requested object is not found."""
class ValidationError(Exception):
"""Raised when request data fails validation."""
class AuthenticationError(PrefectHTTPStatusError):
"""Raised when authentication fails."""from prefect.client.orchestration import get_client
from prefect.exceptions import ObjectNotFound, AuthenticationError
async def error_handling_example():
client = get_client()
try:
# This might raise ObjectNotFound
flow = await client.read_flow_by_name("non-existent-flow")
except ObjectNotFound:
print("Flow not found")
except AuthenticationError:
print("Authentication failed")
except Exception as e:
print(f"Unexpected error: {e}")Query filters and sorting options for API operations.
class FlowFilter:
"""Filter criteria for flow queries."""
def __init__(
self,
name: Union[str, List[str]] = None,
tags: Union[str, List[str]] = None,
id: Union[UUID, List[UUID]] = None,
):
"""Initialize flow filter."""
class DeploymentFilter:
"""Filter criteria for deployment queries."""
def __init__(
self,
name: Union[str, List[str]] = None,
tags: Union[str, List[str]] = None,
work_pool_name: Union[str, List[str]] = None,
is_schedule_active: bool = None,
):
"""Initialize deployment filter."""
class FlowSort:
"""Sorting configuration for flow queries."""
def __init__(
self,
field: str,
direction: str = "asc",
):
"""
Initialize flow sorting.
Parameters:
- field: Field to sort by (name, created, updated)
- direction: Sort direction (asc, desc)
"""
class DeploymentSort:
"""Sorting configuration for deployment queries."""
def __init__(
self,
field: str,
direction: str = "asc",
):
"""Initialize deployment sorting."""from prefect.client.schemas.filters import FlowFilter, DeploymentFilter
from prefect.client.schemas.sorting import FlowSort
async def filtering_example():
client = get_client()
# Filter flows by tags
flow_filter = FlowFilter(tags=["production", "etl"])
flows = await client.read_flows(
flow_filter=flow_filter,
sort=FlowSort(field="name", direction="asc")
)
# Filter deployments by work pool
deployment_filter = DeploymentFilter(
work_pool_name="kubernetes-pool",
is_schedule_active=True
)
deployments = await client.read_deployments(
deployment_filter=deployment_filter,
limit=20
)Types specific to client API operations:
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from datetime import datetime
import httpx
class OrchestrationResult:
"""Result of an orchestration operation."""
state: State
status: SetStateStatus
details: OrchestrationDetails
class SetStateStatus(str, Enum):
"""Status of a state change operation."""
ACCEPT = "ACCEPT"
REJECT = "REJECT"
ABORT = "ABORT"
WAIT = "WAIT"
class OrchestrationDetails:
"""Details about orchestration decisions."""
flow_run_id: Optional[UUID]
task_run_id: Optional[UUID]
transition_id: Optional[UUID]
class Workspace:
"""Prefect Cloud workspace."""
id: UUID
name: str
description: Optional[str]
created: datetime
updated: datetime
class User:
"""Prefect Cloud user."""
id: UUID
email: str
name: str
created: datetime
class WorkspaceSettings:
"""Workspace configuration settings."""
workspace_id: UUID
settings: Dict[str, Any]
# Filter and sort types
class FlowRunFilter:
"""Filter for flow run queries."""
pass
class TaskRunFilter:
"""Filter for task run queries."""
pass
class WorkPoolFilter:
"""Filter for work pool queries."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-prefect