0
# Client API
1
2
Prefect's client API provides programmatic access to the Prefect server and cloud services through HTTP clients. This enables interaction with flows, deployments, work pools, and orchestration services from external applications and scripts.
3
4
## Capabilities
5
6
### Client Factory
7
8
Get HTTP client instances for interacting with Prefect API services.
9
10
```python { .api }
11
def get_client(
12
httpx_settings: dict = None,
13
sync_client: bool = None,
14
) -> Union[PrefectClient, SyncPrefectClient]:
15
"""
16
Get a Prefect HTTP client for API interaction.
17
18
Parameters:
19
- httpx_settings: Custom HTTPX client configuration
20
- sync_client: Whether to return a synchronous client (defaults to async)
21
22
Returns:
23
PrefectClient (async) or SyncPrefectClient (sync) instance
24
25
The client type is determined by:
26
1. sync_client parameter if provided
27
2. Current context (async if in async function)
28
3. Defaults to async client
29
"""
30
```
31
32
#### Usage Examples
33
34
```python
35
from prefect.client.orchestration import get_client
36
37
# Get async client (default)
38
async def async_example():
39
client = get_client()
40
flows = await client.read_flows()
41
return flows
42
43
# Get sync client explicitly
44
def sync_example():
45
client = get_client(sync_client=True)
46
flows = client.read_flows()
47
return flows
48
49
# Client with custom HTTPX settings
50
client = get_client(httpx_settings={
51
"timeout": 30.0,
52
"limits": {"max_connections": 10}
53
})
54
```
55
56
### Async Prefect Client
57
58
Asynchronous HTTP client for Prefect API operations with full async/await support.
59
60
```python { .api }
61
class PrefectClient:
62
"""
63
Asynchronous HTTP client for the Prefect API.
64
65
Provides comprehensive access to Prefect server functionality including
66
flows, deployments, flow runs, task runs, work pools, and orchestration.
67
"""
68
69
def __init__(
70
self,
71
api: str = None,
72
api_key: str = None,
73
api_version: str = None,
74
httpx_settings: dict = None,
75
):
76
"""
77
Initialize the Prefect client.
78
79
Parameters:
80
- api: Prefect API URL (defaults to PREFECT_API_URL setting)
81
- api_key: API key for authentication (defaults to PREFECT_API_KEY setting)
82
- api_version: API version to use (defaults to current version)
83
- httpx_settings: Custom HTTPX client configuration
84
"""
85
86
async def create_flow_run(
87
self,
88
flow: Flow,
89
name: str = None,
90
parameters: Dict[str, Any] = None,
91
context: Dict[str, Any] = None,
92
tags: List[str] = None,
93
parent_task_run_id: UUID = None,
94
state: State = None,
95
) -> FlowRun:
96
"""
97
Create a new flow run.
98
99
Parameters:
100
- flow: Flow object to create a run for
101
- name: Custom name for the flow run
102
- parameters: Parameters to pass to the flow
103
- context: Context data for the run
104
- tags: Tags to apply to the flow run
105
- parent_task_run_id: Parent task run if this is a subflow
106
- state: Initial state for the flow run
107
108
Returns:
109
Created FlowRun object
110
"""
111
112
async def read_flows(
113
self,
114
limit: int = None,
115
offset: int = None,
116
sort: FlowSort = None,
117
) -> List[Flow]:
118
"""
119
Read flows from the API.
120
121
Parameters:
122
- limit: Maximum number of flows to return
123
- offset: Number of flows to skip
124
- sort: Sorting configuration
125
126
Returns:
127
List of Flow objects
128
"""
129
130
async def read_deployments(
131
self,
132
limit: int = None,
133
offset: int = None,
134
sort: DeploymentSort = None,
135
flow_filter: FlowFilter = None,
136
deployment_filter: DeploymentFilter = None,
137
) -> List[Deployment]:
138
"""
139
Read deployments from the API.
140
141
Parameters:
142
- limit: Maximum number of deployments to return
143
- offset: Number of deployments to skip
144
- sort: Sorting configuration
145
- flow_filter: Filter for associated flows
146
- deployment_filter: Filter for deployments
147
148
Returns:
149
List of Deployment objects
150
"""
151
152
async def create_deployment(
153
self,
154
flow_id: UUID,
155
name: str,
156
version: str = None,
157
schedule: Union[CronSchedule, IntervalSchedule] = None,
158
parameters: Dict[str, Any] = None,
159
tags: List[str] = None,
160
work_pool_name: str = None,
161
work_queue_name: str = None,
162
**kwargs
163
) -> Deployment:
164
"""
165
Create a new deployment.
166
167
Parameters:
168
- flow_id: ID of the flow to deploy
169
- name: Name for the deployment
170
- version: Version string
171
- schedule: Scheduling configuration
172
- parameters: Default parameters
173
- tags: Deployment tags
174
- work_pool_name: Target work pool
175
- work_queue_name: Target work queue
176
177
Returns:
178
Created Deployment object
179
"""
180
181
async def read_work_pools(
182
self,
183
limit: int = None,
184
offset: int = None,
185
work_pool_filter: WorkPoolFilter = None,
186
) -> List[WorkPool]:
187
"""
188
Read work pools from the API.
189
190
Parameters:
191
- limit: Maximum number of work pools to return
192
- offset: Number of work pools to skip
193
- work_pool_filter: Filter criteria
194
195
Returns:
196
List of WorkPool objects
197
"""
198
199
async def create_work_pool(
200
self,
201
name: str,
202
type: str,
203
description: str = None,
204
is_paused: bool = False,
205
concurrency_limit: int = None,
206
**kwargs
207
) -> WorkPool:
208
"""
209
Create a new work pool.
210
211
Parameters:
212
- name: Work pool name
213
- type: Work pool type (process, docker, kubernetes, etc.)
214
- description: Work pool description
215
- is_paused: Whether to start paused
216
- concurrency_limit: Maximum concurrent workers
217
218
Returns:
219
Created WorkPool object
220
"""
221
222
async def set_flow_run_state(
223
self,
224
flow_run_id: UUID,
225
state: State,
226
force: bool = False,
227
) -> OrchestrationResult:
228
"""
229
Set the state of a flow run.
230
231
Parameters:
232
- flow_run_id: ID of the flow run
233
- state: New state to set
234
- force: Whether to force the state change
235
236
Returns:
237
OrchestrationResult with state change details
238
"""
239
240
async def set_task_run_state(
241
self,
242
task_run_id: UUID,
243
state: State,
244
force: bool = False,
245
) -> OrchestrationResult:
246
"""
247
Set the state of a task run.
248
249
Parameters:
250
- task_run_id: ID of the task run
251
- state: New state to set
252
- force: Whether to force the state change
253
254
Returns:
255
OrchestrationResult with state change details
256
"""
257
```
258
259
#### Usage Examples
260
261
```python
262
from prefect.client.orchestration import PrefectClient
263
from prefect.client.schemas.objects import Flow, Deployment
264
from prefect.states import Completed
265
266
async def client_operations():
267
client = PrefectClient()
268
269
# List flows
270
flows = await client.read_flows(limit=10)
271
for flow in flows:
272
print(f"Flow: {flow.name}")
273
274
# Create a flow run
275
flow_run = await client.create_flow_run(
276
flow=flows[0],
277
parameters={"param1": "value1"},
278
tags=["api-created"]
279
)
280
281
# List deployments
282
deployments = await client.read_deployments()
283
284
# Update flow run state
285
result = await client.set_flow_run_state(
286
flow_run.id,
287
Completed(message="Completed via API")
288
)
289
290
return flow_run
291
```
292
293
### Sync Prefect Client
294
295
Synchronous HTTP client providing the same functionality as the async client but with blocking operations.
296
297
```python { .api }
298
class SyncPrefectClient:
299
"""
300
Synchronous HTTP client for the Prefect API.
301
302
Provides the same functionality as PrefectClient but with
303
synchronous method calls that block until completion.
304
"""
305
306
def __init__(
307
self,
308
api: str = None,
309
api_key: str = None,
310
api_version: str = None,
311
httpx_settings: dict = None,
312
):
313
"""Initialize the synchronous Prefect client."""
314
315
def create_flow_run(
316
self,
317
flow: Flow,
318
name: str = None,
319
parameters: Dict[str, Any] = None,
320
context: Dict[str, Any] = None,
321
tags: List[str] = None,
322
parent_task_run_id: UUID = None,
323
state: State = None,
324
) -> FlowRun:
325
"""Create a new flow run (synchronous version)."""
326
327
def read_flows(
328
self,
329
limit: int = None,
330
offset: int = None,
331
sort: FlowSort = None,
332
) -> List[Flow]:
333
"""Read flows from the API (synchronous version)."""
334
335
def read_deployments(
336
self,
337
limit: int = None,
338
offset: int = None,
339
sort: DeploymentSort = None,
340
flow_filter: FlowFilter = None,
341
deployment_filter: DeploymentFilter = None,
342
) -> List[Deployment]:
343
"""Read deployments from the API (synchronous version)."""
344
345
def set_flow_run_state(
346
self,
347
flow_run_id: UUID,
348
state: State,
349
force: bool = False,
350
) -> OrchestrationResult:
351
"""Set flow run state (synchronous version)."""
352
```
353
354
#### Usage Examples
355
356
```python
357
from prefect.client.orchestration import SyncPrefectClient
358
from prefect.states import Running
359
360
def sync_client_operations():
361
client = SyncPrefectClient()
362
363
# All operations are synchronous
364
flows = client.read_flows(limit=5)
365
366
if flows:
367
flow_run = client.create_flow_run(
368
flow=flows[0],
369
parameters={"sync": True}
370
)
371
372
# Update state
373
client.set_flow_run_state(
374
flow_run.id,
375
Running(message="Started via sync client")
376
)
377
378
return flow_run
379
```
380
381
### Cloud Client
382
383
Specialized client for Prefect Cloud services with additional cloud-specific functionality.
384
385
```python { .api }
386
def get_cloud_client(
387
host: str = None,
388
api_key: str = None,
389
httpx_settings: dict = None,
390
infer_cloud_url: bool = True,
391
) -> CloudClient:
392
"""
393
Get a Prefect Cloud client.
394
395
Parameters:
396
- host: Prefect Cloud host URL
397
- api_key: Cloud API key
398
- httpx_settings: Custom HTTPX configuration
399
- infer_cloud_url: Whether to automatically detect cloud URL
400
401
Returns:
402
CloudClient instance for Prefect Cloud operations
403
"""
404
405
class CloudClient:
406
"""
407
Client for Prefect Cloud services.
408
409
Extends PrefectClient with cloud-specific functionality like
410
workspace management, user operations, and cloud settings.
411
"""
412
413
async def read_workspaces(self) -> List[Workspace]:
414
"""Read available workspaces."""
415
416
async def create_workspace(
417
self,
418
name: str,
419
description: str = None,
420
) -> Workspace:
421
"""Create a new workspace."""
422
423
async def read_current_user(self) -> User:
424
"""Read current user information."""
425
426
async def read_workspace_settings(
427
self,
428
workspace_id: UUID,
429
) -> WorkspaceSettings:
430
"""Read workspace configuration settings."""
431
```
432
433
### Error Handling
434
435
Exception classes for handling client API errors.
436
437
```python { .api }
438
class PrefectHTTPStatusError(Exception):
439
"""HTTP status error from Prefect API."""
440
441
def __init__(
442
self,
443
message: str,
444
response: httpx.Response,
445
request: httpx.Request = None,
446
):
447
"""Initialize HTTP status error."""
448
449
@property
450
def status_code(self) -> int:
451
"""HTTP status code."""
452
453
class ObjectNotFound(PrefectHTTPStatusError):
454
"""Raised when a requested object is not found."""
455
456
class ValidationError(Exception):
457
"""Raised when request data fails validation."""
458
459
class AuthenticationError(PrefectHTTPStatusError):
460
"""Raised when authentication fails."""
461
```
462
463
#### Usage Examples
464
465
```python
466
from prefect.client.orchestration import get_client
467
from prefect.exceptions import ObjectNotFound, AuthenticationError
468
469
async def error_handling_example():
470
client = get_client()
471
472
try:
473
# This might raise ObjectNotFound
474
flow = await client.read_flow_by_name("non-existent-flow")
475
except ObjectNotFound:
476
print("Flow not found")
477
except AuthenticationError:
478
print("Authentication failed")
479
except Exception as e:
480
print(f"Unexpected error: {e}")
481
```
482
483
### Filtering and Sorting
484
485
Query filters and sorting options for API operations.
486
487
```python { .api }
488
class FlowFilter:
489
"""Filter criteria for flow queries."""
490
491
def __init__(
492
self,
493
name: Union[str, List[str]] = None,
494
tags: Union[str, List[str]] = None,
495
id: Union[UUID, List[UUID]] = None,
496
):
497
"""Initialize flow filter."""
498
499
class DeploymentFilter:
500
"""Filter criteria for deployment queries."""
501
502
def __init__(
503
self,
504
name: Union[str, List[str]] = None,
505
tags: Union[str, List[str]] = None,
506
work_pool_name: Union[str, List[str]] = None,
507
is_schedule_active: bool = None,
508
):
509
"""Initialize deployment filter."""
510
511
class FlowSort:
512
"""Sorting configuration for flow queries."""
513
514
def __init__(
515
self,
516
field: str,
517
direction: str = "asc",
518
):
519
"""
520
Initialize flow sorting.
521
522
Parameters:
523
- field: Field to sort by (name, created, updated)
524
- direction: Sort direction (asc, desc)
525
"""
526
527
class DeploymentSort:
528
"""Sorting configuration for deployment queries."""
529
530
def __init__(
531
self,
532
field: str,
533
direction: str = "asc",
534
):
535
"""Initialize deployment sorting."""
536
```
537
538
#### Usage Examples
539
540
```python
541
from prefect.client.schemas.filters import FlowFilter, DeploymentFilter
542
from prefect.client.schemas.sorting import FlowSort
543
544
async def filtering_example():
545
client = get_client()
546
547
# Filter flows by tags
548
flow_filter = FlowFilter(tags=["production", "etl"])
549
flows = await client.read_flows(
550
flow_filter=flow_filter,
551
sort=FlowSort(field="name", direction="asc")
552
)
553
554
# Filter deployments by work pool
555
deployment_filter = DeploymentFilter(
556
work_pool_name="kubernetes-pool",
557
is_schedule_active=True
558
)
559
deployments = await client.read_deployments(
560
deployment_filter=deployment_filter,
561
limit=20
562
)
563
```
564
565
## Types
566
567
Types specific to client API operations:
568
569
```python { .api }
570
from typing import Any, Dict, List, Optional, Union
571
from uuid import UUID
572
from datetime import datetime
573
import httpx
574
575
class OrchestrationResult:
576
"""Result of an orchestration operation."""
577
state: State
578
status: SetStateStatus
579
details: OrchestrationDetails
580
581
class SetStateStatus(str, Enum):
582
"""Status of a state change operation."""
583
ACCEPT = "ACCEPT"
584
REJECT = "REJECT"
585
ABORT = "ABORT"
586
WAIT = "WAIT"
587
588
class OrchestrationDetails:
589
"""Details about orchestration decisions."""
590
flow_run_id: Optional[UUID]
591
task_run_id: Optional[UUID]
592
transition_id: Optional[UUID]
593
594
class Workspace:
595
"""Prefect Cloud workspace."""
596
id: UUID
597
name: str
598
description: Optional[str]
599
created: datetime
600
updated: datetime
601
602
class User:
603
"""Prefect Cloud user."""
604
id: UUID
605
email: str
606
name: str
607
created: datetime
608
609
class WorkspaceSettings:
610
"""Workspace configuration settings."""
611
workspace_id: UUID
612
settings: Dict[str, Any]
613
614
# Filter and sort types
615
class FlowRunFilter:
616
"""Filter for flow run queries."""
617
pass
618
619
class TaskRunFilter:
620
"""Filter for task run queries."""
621
pass
622
623
class WorkPoolFilter:
624
"""Filter for work pool queries."""
625
pass
626
```