or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md

client-api.mddocs/

0

# Client API

1

2

Prefect's client API provides programmatic access to the Prefect server and cloud services through HTTP clients. This enables interaction with flows, deployments, work pools, and orchestration services from external applications and scripts.

3

4

## Capabilities

5

6

### Client Factory

7

8

Get HTTP client instances for interacting with Prefect API services.

9

10

```python { .api }

11

def get_client(

12

httpx_settings: dict = None,

13

sync_client: bool = None,

14

) -> Union[PrefectClient, SyncPrefectClient]:

15

"""

16

Get a Prefect HTTP client for API interaction.

17

18

Parameters:

19

- httpx_settings: Custom HTTPX client configuration

20

- sync_client: Whether to return a synchronous client (defaults to async)

21

22

Returns:

23

PrefectClient (async) or SyncPrefectClient (sync) instance

24

25

The client type is determined by:

26

1. sync_client parameter if provided

27

2. Current context (async if in async function)

28

3. Defaults to async client

29

"""

30

```

31

32

#### Usage Examples

33

34

```python

35

from prefect.client.orchestration import get_client

36

37

# Get async client (default)

38

async def async_example():

39

client = get_client()

40

flows = await client.read_flows()

41

return flows

42

43

# Get sync client explicitly

44

def sync_example():

45

client = get_client(sync_client=True)

46

flows = client.read_flows()

47

return flows

48

49

# Client with custom HTTPX settings

50

client = get_client(httpx_settings={

51

"timeout": 30.0,

52

"limits": {"max_connections": 10}

53

})

54

```

55

56

### Async Prefect Client

57

58

Asynchronous HTTP client for Prefect API operations with full async/await support.

59

60

```python { .api }

61

class PrefectClient:

62

"""

63

Asynchronous HTTP client for the Prefect API.

64

65

Provides comprehensive access to Prefect server functionality including

66

flows, deployments, flow runs, task runs, work pools, and orchestration.

67

"""

68

69

def __init__(

70

self,

71

api: str = None,

72

api_key: str = None,

73

api_version: str = None,

74

httpx_settings: dict = None,

75

):

76

"""

77

Initialize the Prefect client.

78

79

Parameters:

80

- api: Prefect API URL (defaults to PREFECT_API_URL setting)

81

- api_key: API key for authentication (defaults to PREFECT_API_KEY setting)

82

- api_version: API version to use (defaults to current version)

83

- httpx_settings: Custom HTTPX client configuration

84

"""

85

86

async def create_flow_run(

87

self,

88

flow: Flow,

89

name: str = None,

90

parameters: Dict[str, Any] = None,

91

context: Dict[str, Any] = None,

92

tags: List[str] = None,

93

parent_task_run_id: UUID = None,

94

state: State = None,

95

) -> FlowRun:

96

"""

97

Create a new flow run.

98

99

Parameters:

100

- flow: Flow object to create a run for

101

- name: Custom name for the flow run

102

- parameters: Parameters to pass to the flow

103

- context: Context data for the run

104

- tags: Tags to apply to the flow run

105

- parent_task_run_id: Parent task run if this is a subflow

106

- state: Initial state for the flow run

107

108

Returns:

109

Created FlowRun object

110

"""

111

112

async def read_flows(

113

self,

114

limit: int = None,

115

offset: int = None,

116

sort: FlowSort = None,

117

) -> List[Flow]:

118

"""

119

Read flows from the API.

120

121

Parameters:

122

- limit: Maximum number of flows to return

123

- offset: Number of flows to skip

124

- sort: Sorting configuration

125

126

Returns:

127

List of Flow objects

128

"""

129

130

async def read_deployments(

131

self,

132

limit: int = None,

133

offset: int = None,

134

sort: DeploymentSort = None,

135

flow_filter: FlowFilter = None,

136

deployment_filter: DeploymentFilter = None,

137

) -> List[Deployment]:

138

"""

139

Read deployments from the API.

140

141

Parameters:

142

- limit: Maximum number of deployments to return

143

- offset: Number of deployments to skip

144

- sort: Sorting configuration

145

- flow_filter: Filter for associated flows

146

- deployment_filter: Filter for deployments

147

148

Returns:

149

List of Deployment objects

150

"""

151

152

async def create_deployment(

153

self,

154

flow_id: UUID,

155

name: str,

156

version: str = None,

157

schedule: Union[CronSchedule, IntervalSchedule] = None,

158

parameters: Dict[str, Any] = None,

159

tags: List[str] = None,

160

work_pool_name: str = None,

161

work_queue_name: str = None,

162

**kwargs

163

) -> Deployment:

164

"""

165

Create a new deployment.

166

167

Parameters:

168

- flow_id: ID of the flow to deploy

169

- name: Name for the deployment

170

- version: Version string

171

- schedule: Scheduling configuration

172

- parameters: Default parameters

173

- tags: Deployment tags

174

- work_pool_name: Target work pool

175

- work_queue_name: Target work queue

176

177

Returns:

178

Created Deployment object

179

"""

180

181

async def read_work_pools(

182

self,

183

limit: int = None,

184

offset: int = None,

185

work_pool_filter: WorkPoolFilter = None,

186

) -> List[WorkPool]:

187

"""

188

Read work pools from the API.

189

190

Parameters:

191

- limit: Maximum number of work pools to return

192

- offset: Number of work pools to skip

193

- work_pool_filter: Filter criteria

194

195

Returns:

196

List of WorkPool objects

197

"""

198

199

async def create_work_pool(

200

self,

201

name: str,

202

type: str,

203

description: str = None,

204

is_paused: bool = False,

205

concurrency_limit: int = None,

206

**kwargs

207

) -> WorkPool:

208

"""

209

Create a new work pool.

210

211

Parameters:

212

- name: Work pool name

213

- type: Work pool type (process, docker, kubernetes, etc.)

214

- description: Work pool description

215

- is_paused: Whether to start paused

216

- concurrency_limit: Maximum concurrent workers

217

218

Returns:

219

Created WorkPool object

220

"""

221

222

async def set_flow_run_state(

223

self,

224

flow_run_id: UUID,

225

state: State,

226

force: bool = False,

227

) -> OrchestrationResult:

228

"""

229

Set the state of a flow run.

230

231

Parameters:

232

- flow_run_id: ID of the flow run

233

- state: New state to set

234

- force: Whether to force the state change

235

236

Returns:

237

OrchestrationResult with state change details

238

"""

239

240

async def set_task_run_state(

241

self,

242

task_run_id: UUID,

243

state: State,

244

force: bool = False,

245

) -> OrchestrationResult:

246

"""

247

Set the state of a task run.

248

249

Parameters:

250

- task_run_id: ID of the task run

251

- state: New state to set

252

- force: Whether to force the state change

253

254

Returns:

255

OrchestrationResult with state change details

256

"""

257

```

258

259

#### Usage Examples

260

261

```python

262

from prefect.client.orchestration import PrefectClient

263

from prefect.client.schemas.objects import Flow, Deployment

264

from prefect.states import Completed

265

266

async def client_operations():

267

client = PrefectClient()

268

269

# List flows

270

flows = await client.read_flows(limit=10)

271

for flow in flows:

272

print(f"Flow: {flow.name}")

273

274

# Create a flow run

275

flow_run = await client.create_flow_run(

276

flow=flows[0],

277

parameters={"param1": "value1"},

278

tags=["api-created"]

279

)

280

281

# List deployments

282

deployments = await client.read_deployments()

283

284

# Update flow run state

285

result = await client.set_flow_run_state(

286

flow_run.id,

287

Completed(message="Completed via API")

288

)

289

290

return flow_run

291

```

292

293

### Sync Prefect Client

294

295

Synchronous HTTP client providing the same functionality as the async client but with blocking operations.

296

297

```python { .api }

298

class SyncPrefectClient:

299

"""

300

Synchronous HTTP client for the Prefect API.

301

302

Provides the same functionality as PrefectClient but with

303

synchronous method calls that block until completion.

304

"""

305

306

def __init__(

307

self,

308

api: str = None,

309

api_key: str = None,

310

api_version: str = None,

311

httpx_settings: dict = None,

312

):

313

"""Initialize the synchronous Prefect client."""

314

315

def create_flow_run(

316

self,

317

flow: Flow,

318

name: str = None,

319

parameters: Dict[str, Any] = None,

320

context: Dict[str, Any] = None,

321

tags: List[str] = None,

322

parent_task_run_id: UUID = None,

323

state: State = None,

324

) -> FlowRun:

325

"""Create a new flow run (synchronous version)."""

326

327

def read_flows(

328

self,

329

limit: int = None,

330

offset: int = None,

331

sort: FlowSort = None,

332

) -> List[Flow]:

333

"""Read flows from the API (synchronous version)."""

334

335

def read_deployments(

336

self,

337

limit: int = None,

338

offset: int = None,

339

sort: DeploymentSort = None,

340

flow_filter: FlowFilter = None,

341

deployment_filter: DeploymentFilter = None,

342

) -> List[Deployment]:

343

"""Read deployments from the API (synchronous version)."""

344

345

def set_flow_run_state(

346

self,

347

flow_run_id: UUID,

348

state: State,

349

force: bool = False,

350

) -> OrchestrationResult:

351

"""Set flow run state (synchronous version)."""

352

```

353

354

#### Usage Examples

355

356

```python

357

from prefect.client.orchestration import SyncPrefectClient

358

from prefect.states import Running

359

360

def sync_client_operations():

361

client = SyncPrefectClient()

362

363

# All operations are synchronous

364

flows = client.read_flows(limit=5)

365

366

if flows:

367

flow_run = client.create_flow_run(

368

flow=flows[0],

369

parameters={"sync": True}

370

)

371

372

# Update state

373

client.set_flow_run_state(

374

flow_run.id,

375

Running(message="Started via sync client")

376

)

377

378

return flow_run

379

```

380

381

### Cloud Client

382

383

Specialized client for Prefect Cloud services with additional cloud-specific functionality.

384

385

```python { .api }

386

def get_cloud_client(

387

host: str = None,

388

api_key: str = None,

389

httpx_settings: dict = None,

390

infer_cloud_url: bool = True,

391

) -> CloudClient:

392

"""

393

Get a Prefect Cloud client.

394

395

Parameters:

396

- host: Prefect Cloud host URL

397

- api_key: Cloud API key

398

- httpx_settings: Custom HTTPX configuration

399

- infer_cloud_url: Whether to automatically detect cloud URL

400

401

Returns:

402

CloudClient instance for Prefect Cloud operations

403

"""

404

405

class CloudClient:

406

"""

407

Client for Prefect Cloud services.

408

409

Extends PrefectClient with cloud-specific functionality like

410

workspace management, user operations, and cloud settings.

411

"""

412

413

async def read_workspaces(self) -> List[Workspace]:

414

"""Read available workspaces."""

415

416

async def create_workspace(

417

self,

418

name: str,

419

description: str = None,

420

) -> Workspace:

421

"""Create a new workspace."""

422

423

async def read_current_user(self) -> User:

424

"""Read current user information."""

425

426

async def read_workspace_settings(

427

self,

428

workspace_id: UUID,

429

) -> WorkspaceSettings:

430

"""Read workspace configuration settings."""

431

```

432

433

### Error Handling

434

435

Exception classes for handling client API errors.

436

437

```python { .api }

438

class PrefectHTTPStatusError(Exception):

439

"""HTTP status error from Prefect API."""

440

441

def __init__(

442

self,

443

message: str,

444

response: httpx.Response,

445

request: httpx.Request = None,

446

):

447

"""Initialize HTTP status error."""

448

449

@property

450

def status_code(self) -> int:

451

"""HTTP status code."""

452

453

class ObjectNotFound(PrefectHTTPStatusError):

454

"""Raised when a requested object is not found."""

455

456

class ValidationError(Exception):

457

"""Raised when request data fails validation."""

458

459

class AuthenticationError(PrefectHTTPStatusError):

460

"""Raised when authentication fails."""

461

```

462

463

#### Usage Examples

464

465

```python

466

from prefect.client.orchestration import get_client

467

from prefect.exceptions import ObjectNotFound, AuthenticationError

468

469

async def error_handling_example():

470

client = get_client()

471

472

try:

473

# This might raise ObjectNotFound

474

flow = await client.read_flow_by_name("non-existent-flow")

475

except ObjectNotFound:

476

print("Flow not found")

477

except AuthenticationError:

478

print("Authentication failed")

479

except Exception as e:

480

print(f"Unexpected error: {e}")

481

```

482

483

### Filtering and Sorting

484

485

Query filters and sorting options for API operations.

486

487

```python { .api }

488

class FlowFilter:

489

"""Filter criteria for flow queries."""

490

491

def __init__(

492

self,

493

name: Union[str, List[str]] = None,

494

tags: Union[str, List[str]] = None,

495

id: Union[UUID, List[UUID]] = None,

496

):

497

"""Initialize flow filter."""

498

499

class DeploymentFilter:

500

"""Filter criteria for deployment queries."""

501

502

def __init__(

503

self,

504

name: Union[str, List[str]] = None,

505

tags: Union[str, List[str]] = None,

506

work_pool_name: Union[str, List[str]] = None,

507

is_schedule_active: bool = None,

508

):

509

"""Initialize deployment filter."""

510

511

class FlowSort:

512

"""Sorting configuration for flow queries."""

513

514

def __init__(

515

self,

516

field: str,

517

direction: str = "asc",

518

):

519

"""

520

Initialize flow sorting.

521

522

Parameters:

523

- field: Field to sort by (name, created, updated)

524

- direction: Sort direction (asc, desc)

525

"""

526

527

class DeploymentSort:

528

"""Sorting configuration for deployment queries."""

529

530

def __init__(

531

self,

532

field: str,

533

direction: str = "asc",

534

):

535

"""Initialize deployment sorting."""

536

```

537

538

#### Usage Examples

539

540

```python

541

from prefect.client.schemas.filters import FlowFilter, DeploymentFilter

542

from prefect.client.schemas.sorting import FlowSort

543

544

async def filtering_example():

545

client = get_client()

546

547

# Filter flows by tags

548

flow_filter = FlowFilter(tags=["production", "etl"])

549

flows = await client.read_flows(

550

flow_filter=flow_filter,

551

sort=FlowSort(field="name", direction="asc")

552

)

553

554

# Filter deployments by work pool

555

deployment_filter = DeploymentFilter(

556

work_pool_name="kubernetes-pool",

557

is_schedule_active=True

558

)

559

deployments = await client.read_deployments(

560

deployment_filter=deployment_filter,

561

limit=20

562

)

563

```

564

565

## Types

566

567

Types specific to client API operations:

568

569

```python { .api }

570

from typing import Any, Dict, List, Optional, Union

571

from uuid import UUID

572

from datetime import datetime

573

import httpx

574

575

class OrchestrationResult:

576

"""Result of an orchestration operation."""

577

state: State

578

status: SetStateStatus

579

details: OrchestrationDetails

580

581

class SetStateStatus(str, Enum):

582

"""Status of a state change operation."""

583

ACCEPT = "ACCEPT"

584

REJECT = "REJECT"

585

ABORT = "ABORT"

586

WAIT = "WAIT"

587

588

class OrchestrationDetails:

589

"""Details about orchestration decisions."""

590

flow_run_id: Optional[UUID]

591

task_run_id: Optional[UUID]

592

transition_id: Optional[UUID]

593

594

class Workspace:

595

"""Prefect Cloud workspace."""

596

id: UUID

597

name: str

598

description: Optional[str]

599

created: datetime

600

updated: datetime

601

602

class User:

603

"""Prefect Cloud user."""

604

id: UUID

605

email: str

606

name: str

607

created: datetime

608

609

class WorkspaceSettings:

610

"""Workspace configuration settings."""

611

workspace_id: UUID

612

settings: Dict[str, Any]

613

614

# Filter and sort types

615

class FlowRunFilter:

616

"""Filter for flow run queries."""

617

pass

618

619

class TaskRunFilter:

620

"""Filter for task run queries."""

621

pass

622

623

class WorkPoolFilter:

624

"""Filter for work pool queries."""

625

pass

626

```