or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md

state-management.mddocs/

0

# State Management

1

2

Prefect's state management system provides comprehensive control over workflow execution through immutable state objects and lifecycle functions. States represent the current status of flows and tasks, enabling fine-grained control over execution, retry logic, and error handling.

3

4

## Capabilities

5

6

### State Creation Functions

7

8

Factory functions for creating different types of states that represent various stages in the lifecycle of flows and tasks.

9

10

```python { .api }

11

def Completed(cls: type = State, **kwargs: Any) -> State:

12

"""

13

Create a completed state indicating successful execution.

14

15

Parameters:

16

- data: Result data from the completed operation

17

- name: Optional state name

18

- message: Optional descriptive message

19

- type: State type (defaults to COMPLETED)

20

21

Returns:

22

State object representing successful completion

23

"""

24

25

def Failed(cls: type = State, **kwargs: Any) -> State:

26

"""

27

Create a failed state indicating execution failure.

28

29

Parameters:

30

- data: Error information or exception details

31

- name: Optional state name

32

- message: Optional error message

33

- type: State type (defaults to FAILED)

34

35

Returns:

36

State object representing failure

37

"""

38

39

def Running(cls: type = State, **kwargs: Any) -> State:

40

"""

41

Create a running state indicating active execution.

42

43

Parameters:

44

- data: Optional data about the running operation

45

- name: Optional state name

46

- message: Optional status message

47

- type: State type (defaults to RUNNING)

48

49

Returns:

50

State object representing active execution

51

"""

52

53

def Scheduled(

54

scheduled_time: datetime = None,

55

name: str = None,

56

message: str = None,

57

type: StateType = None,

58

) -> State:

59

"""

60

Create a scheduled state for future execution.

61

62

Parameters:

63

- scheduled_time: When the operation is scheduled to run

64

- name: Optional state name

65

- message: Optional scheduling message

66

- type: State type (defaults to SCHEDULED)

67

68

Returns:

69

State object representing scheduled execution

70

"""

71

72

def Pending(cls: type = State, **kwargs: Any) -> State:

73

"""

74

Create a pending state for operations awaiting execution.

75

76

Parameters:

77

- name: Optional state name

78

- message: Optional pending message

79

- type: State type (defaults to PENDING)

80

81

Returns:

82

State object representing pending execution

83

"""

84

85

def Crashed(cls: type = State, **kwargs: Any) -> State:

86

"""

87

Create a crashed state for unexpected failures.

88

89

Parameters:

90

- data: Crash information or exception details

91

- name: Optional state name

92

- message: Optional crash message

93

- type: State type (defaults to CRASHED)

94

95

Returns:

96

State object representing system crash

97

"""

98

99

def Cancelled(cls: type = State, **kwargs: Any) -> State:

100

"""

101

Create a cancelled state for deliberately stopped operations.

102

103

Parameters:

104

- data: Optional cancellation data

105

- name: Optional state name

106

- message: Optional cancellation message

107

- type: State type (defaults to CANCELLED)

108

109

Returns:

110

State object representing cancellation

111

"""

112

113

def Cancelling(

114

data: Any = None,

115

name: str = None,

116

message: str = None,

117

type: StateType = None,

118

) -> State:

119

"""

120

Create a cancelling state for operations in the process of being cancelled.

121

122

Parameters:

123

- data: Optional cancellation data

124

- name: Optional state name

125

- message: Optional cancellation message

126

- type: State type (defaults to CANCELLING)

127

128

Returns:

129

State object representing active cancellation

130

"""

131

132

def Paused(

133

cls: type = State,

134

timeout_seconds: Optional[int] = None,

135

pause_expiration_time: Optional[datetime] = None,

136

reschedule: bool = False,

137

pause_key: Optional[str] = None,

138

**kwargs: Any,

139

) -> State:

140

"""

141

Create a paused state for temporarily halted operations.

142

143

Parameters:

144

- data: Optional pause data

145

- name: Optional state name

146

- message: Optional pause message

147

- type: State type (defaults to PAUSED)

148

149

Returns:

150

State object representing paused execution

151

"""

152

153

def Suspended(

154

cls: type = State,

155

timeout_seconds: Optional[int] = None,

156

pause_expiration_time: Optional[datetime] = None,

157

pause_key: Optional[str] = None,

158

**kwargs: Any,

159

) -> State:

160

"""

161

Create a suspended state for long-term halted operations.

162

163

Parameters:

164

- data: Optional suspension data

165

- name: Optional state name

166

- message: Optional suspension message

167

- type: State type (defaults to SUSPENDED)

168

169

Returns:

170

State object representing suspended execution

171

"""

172

173

def AwaitingRetry(

174

cls: type = State,

175

scheduled_time: Optional[datetime] = None,

176

**kwargs: Any,

177

) -> State:

178

"""

179

Create an awaiting retry state for operations scheduled to retry.

180

181

Parameters:

182

- scheduled_time: When the retry is scheduled

183

- name: Optional state name

184

- message: Optional retry message

185

- type: State type (defaults to AWAITING_RETRY)

186

187

Returns:

188

State object representing scheduled retry

189

"""

190

191

def Retrying(cls: type = State, **kwargs: Any) -> State:

192

"""

193

Create a retrying state for operations currently being retried.

194

195

Parameters:

196

- data: Optional retry data

197

- name: Optional state name

198

- message: Optional retry message

199

- type: State type (defaults to RETRYING)

200

201

Returns:

202

State object representing active retry

203

"""

204

205

def Late(

206

cls: type = State,

207

scheduled_time: Optional[datetime] = None,

208

**kwargs: Any,

209

) -> State:

210

"""

211

Create a late state for operations that missed their scheduled time.

212

213

Parameters:

214

- data: Optional lateness data

215

- name: Optional state name

216

- message: Optional lateness message

217

- type: State type (defaults to LATE)

218

219

Returns:

220

State object representing late execution

221

"""

222

```

223

224

#### Usage Examples

225

226

```python

227

from prefect import flow, task

228

from prefect.states import Completed, Failed, Running, Scheduled

229

from datetime import datetime, timedelta

230

231

@task

232

def process_data():

233

try:

234

# Processing logic

235

result = {"processed": 100}

236

return Completed(data=result, message="Processing successful")

237

except Exception as e:

238

return Failed(data=str(e), message="Processing failed")

239

240

@flow

241

def scheduled_workflow():

242

# Schedule for future execution

243

future_time = datetime.now() + timedelta(hours=1)

244

return Scheduled(scheduled_time=future_time, message="Scheduled for later")

245

246

# Using states in flow logic

247

@flow

248

def conditional_flow():

249

task_state = process_data()

250

if task_state.is_completed():

251

return "Success"

252

elif task_state.is_failed():

253

return "Failure"

254

else:

255

return "Unknown"

256

```

257

258

### Flow Run Control

259

260

Functions for controlling the lifecycle of running flows, including pause, resume, and suspension operations.

261

262

```python { .api }

263

def pause_flow_run(

264

flow_run_id: Optional[UUID] = None,

265

timeout: int = 300,

266

poll_interval: int = 10,

267

reschedule: bool = False,

268

key: Optional[str] = None,

269

) -> None:

270

"""

271

Pause a flow run, halting execution until manually resumed.

272

273

Parameters:

274

- flow_run_id: ID of the flow run to pause (defaults to current run)

275

- timeout: Maximum time to wait for pause acknowledgment (seconds)

276

- poll_interval: Polling interval for pause status (seconds)

277

- reschedule: Whether to reschedule the flow run after pausing

278

- key: Optional key for identifying the pause point

279

280

Raises:

281

TimeoutError: If pause is not acknowledged within timeout

282

"""

283

284

def resume_flow_run(

285

flow_run_id: UUID,

286

run_input: Dict[str, Any] = None,

287

) -> None:

288

"""

289

Resume a paused flow run.

290

291

Parameters:

292

- flow_run_id: ID of the paused flow run to resume

293

- run_input: Optional input data to provide when resuming

294

295

Raises:

296

ValueError: If flow run is not in a paused state

297

"""

298

299

def suspend_flow_run(

300

flow_run_id: Optional[UUID] = None,

301

timeout: int = 300,

302

poll_interval: int = 10,

303

key: Optional[str] = None,

304

) -> None:

305

"""

306

Suspend a flow run for long-term storage and later resumption.

307

308

Parameters:

309

- flow_run_id: ID of the flow run to suspend (defaults to current run)

310

- timeout: Maximum time to wait for suspension acknowledgment (seconds)

311

- poll_interval: Polling interval for suspension status (seconds)

312

- key: Optional key for identifying the suspension point

313

314

Raises:

315

TimeoutError: If suspension is not acknowledged within timeout

316

"""

317

```

318

319

#### Usage Examples

320

321

```python

322

from prefect import flow, get_run_logger

323

from prefect.flow_runs import pause_flow_run, resume_flow_run, suspend_flow_run

324

from prefect.client.orchestration import get_client

325

326

@flow

327

def interactive_flow():

328

logger = get_run_logger()

329

330

logger.info("Starting workflow")

331

332

# Pause for manual review

333

logger.info("Pausing for manual review")

334

pause_flow_run(key="manual_review", timeout=600)

335

336

logger.info("Resumed after manual review")

337

338

# Continue processing

339

return "Workflow completed"

340

341

@flow

342

def long_running_flow():

343

logger = get_run_logger()

344

345

# Process first batch

346

logger.info("Processing first batch")

347

348

# Suspend for overnight processing

349

logger.info("Suspending for overnight processing")

350

suspend_flow_run(key="overnight_break")

351

352

# Resume processing next day

353

logger.info("Resuming processing")

354

355

return "Long workflow completed"

356

357

# Resume a suspended flow programmatically

358

async def resume_workflow(flow_run_id: str):

359

client = get_client()

360

await resume_flow_run(flow_run_id, run_input={"resumed_at": datetime.now()})

361

```

362

363

### State Utilities

364

365

Utility functions for working with states, including result extraction and state conversion.

366

367

```python { .api }

368

def get_state_result(

369

state: State,

370

raise_on_failure: bool = True,

371

) -> Any:

372

"""

373

Extract the result data from a state object.

374

375

Parameters:

376

- state: State object to extract result from

377

- raise_on_failure: Whether to raise exception for failed states

378

379

Returns:

380

The result data stored in the state

381

382

Raises:

383

Exception: If state represents a failure and raise_on_failure is True

384

"""

385

386

def to_state_create(state: State) -> StateCreate:

387

"""

388

Convert a State object to StateCreate format for API submission.

389

390

Parameters:

391

- state: State object to convert

392

393

Returns:

394

StateCreate object suitable for API operations

395

"""

396

397

def exception_to_crashed_state(

398

exception: Exception,

399

message: str = None,

400

) -> State:

401

"""

402

Convert an exception to a crashed state.

403

404

Parameters:

405

- exception: Exception that caused the crash

406

- message: Optional message to include

407

408

Returns:

409

Crashed state containing exception information

410

"""

411

412

def exception_to_failed_state(

413

exception: Exception,

414

message: str = None,

415

) -> State:

416

"""

417

Convert an exception to a failed state.

418

419

Parameters:

420

- exception: Exception that caused the failure

421

- message: Optional message to include

422

423

Returns:

424

Failed state containing exception information

425

"""

426

427

def format_exception(exception: Exception) -> str:

428

"""

429

Format an exception for display in state messages.

430

431

Parameters:

432

- exception: Exception to format

433

434

Returns:

435

Formatted string representation of the exception

436

"""

437

```

438

439

#### Usage Examples

440

441

```python

442

from prefect import task, flow

443

from prefect.states import get_state_result, exception_to_failed_state

444

445

@task

446

def risky_task():

447

try:

448

# Risky operation

449

result = complex_operation()

450

return result

451

except Exception as e:

452

# Convert exception to failed state

453

return exception_to_failed_state(e, "Complex operation failed")

454

455

@flow

456

def result_processing_flow():

457

task_state = risky_task()

458

459

# Extract result safely

460

try:

461

result = get_state_result(task_state)

462

return f"Success: {result}"

463

except Exception:

464

return "Task failed, handling gracefully"

465

```

466

467

### State Class

468

469

The core State class representing the current status of flows and tasks.

470

471

```python { .api }

472

class State:

473

"""

474

Immutable state object representing the current status of a flow or task.

475

476

Attributes:

477

- type: StateType enum value (COMPLETED, FAILED, etc.)

478

- name: Optional descriptive name

479

- message: Optional status message

480

- data: Associated data or result

481

- timestamp: When the state was created

482

- state_details: Additional state-specific information

483

"""

484

485

def __init__(

486

self,

487

type: StateType,

488

name: str = None,

489

message: str = None,

490

data: Any = None,

491

timestamp: datetime = None,

492

state_details: StateDetails = None,

493

):

494

"""Initialize a state object."""

495

496

def is_scheduled(self) -> bool:

497

"""Check if state represents scheduled execution."""

498

499

def is_pending(self) -> bool:

500

"""Check if state represents pending execution."""

501

502

def is_running(self) -> bool:

503

"""Check if state represents active execution."""

504

505

def is_completed(self) -> bool:

506

"""Check if state represents successful completion."""

507

508

def is_failed(self) -> bool:

509

"""Check if state represents failure."""

510

511

def is_crashed(self) -> bool:

512

"""Check if state represents a system crash."""

513

514

def is_cancelled(self) -> bool:

515

"""Check if state represents cancellation."""

516

517

def is_cancelling(self) -> bool:

518

"""Check if state represents active cancellation."""

519

520

def is_paused(self) -> bool:

521

"""Check if state represents a pause."""

522

523

def is_suspended(self) -> bool:

524

"""Check if state represents suspension."""

525

526

def is_final(self) -> bool:

527

"""Check if state represents a final (terminal) status."""

528

529

def copy(

530

self,

531

*,

532

type: StateType = None,

533

name: str = None,

534

message: str = None,

535

data: Any = None,

536

) -> "State":

537

"""Create a copy of the state with modified attributes."""

538

539

def result(self, raise_on_failure: bool = True) -> Any:

540

"""

541

Get the result from the state.

542

543

Parameters:

544

- raise_on_failure: Whether to raise on failed states

545

546

Returns:

547

The result data from the state

548

"""

549

```

550

551

### State Groups

552

553

Utility for grouping and categorizing states by their types.

554

555

```python { .api }

556

class StateGroup:

557

"""

558

Groups states by their type for easier categorization and handling.

559

"""

560

561

# Final states that represent workflow completion

562

FINAL = frozenset([

563

StateType.COMPLETED,

564

StateType.FAILED,

565

StateType.CRASHED,

566

StateType.CANCELLED,

567

])

568

569

# Running states that represent active execution

570

RUNNING = frozenset([

571

StateType.RUNNING,

572

StateType.CANCELLING,

573

StateType.RETRYING,

574

])

575

576

# Waiting states that represent pending execution

577

WAITING = frozenset([

578

StateType.SCHEDULED,

579

StateType.PENDING,

580

StateType.AWAITING_RETRY,

581

StateType.PAUSED,

582

StateType.SUSPENDED,

583

])

584

585

@classmethod

586

def is_final(cls, state_type: StateType) -> bool:

587

"""Check if a state type is final."""

588

return state_type in cls.FINAL

589

590

@classmethod

591

def is_running(cls, state_type: StateType) -> bool:

592

"""Check if a state type represents running execution."""

593

return state_type in cls.RUNNING

594

595

@classmethod

596

def is_waiting(cls, state_type: StateType) -> bool:

597

"""Check if a state type represents waiting for execution."""

598

return state_type in cls.WAITING

599

```

600

601

#### Usage Examples

602

603

```python

604

from prefect.states import StateGroup, StateType

605

606

def handle_state(state_type: StateType):

607

if StateGroup.is_final(state_type):

608

print("Workflow has completed")

609

elif StateGroup.is_running(state_type):

610

print("Workflow is actively running")

611

elif StateGroup.is_waiting(state_type):

612

print("Workflow is waiting to run")

613

```

614

615

## Types

616

617

Types related to state management:

618

619

```python { .api }

620

from typing import Any, Optional, Dict

621

from datetime import datetime

622

from enum import Enum

623

from uuid import UUID

624

625

class StateType(str, Enum):

626

"""Enumeration of all possible state types."""

627

SCHEDULED = "SCHEDULED"

628

PENDING = "PENDING"

629

RUNNING = "RUNNING"

630

COMPLETED = "COMPLETED"

631

FAILED = "FAILED"

632

CANCELLED = "CANCELLED"

633

CRASHED = "CRASHED"

634

PAUSED = "PAUSED"

635

SUSPENDED = "SUSPENDED"

636

AWAITING_RETRY = "AWAITING_RETRY"

637

RETRYING = "RETRYING"

638

CANCELLING = "CANCELLING"

639

LATE = "LATE"

640

641

class StateDetails:

642

"""Additional details for specific state types."""

643

flow_run_id: Optional[UUID]

644

task_run_id: Optional[UUID]

645

child_flow_run_id: Optional[UUID]

646

scheduled_time: Optional[datetime]

647

cache_key: Optional[str]

648

pause_timeout: Optional[datetime]

649

pause_reschedule: Optional[bool]

650

pause_key: Optional[str]

651

run_input_keyset: Optional[Dict[str, Any]]

652

653

class StateCreate:

654

"""State creation format for API operations."""

655

type: StateType

656

name: Optional[str]

657

message: Optional[str]

658

state_details: Optional[StateDetails]

659

data: Optional[Any]

660

```