or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md

context-utilities.mddocs/

0

# Context & Utilities

1

2

Prefect's context and utility systems provide runtime information, logging capabilities, execution annotations, and transaction management. These components enable advanced workflow control, debugging, and data consistency across distributed execution environments.

3

4

## Capabilities

5

6

### Logging

7

8

Access to structured logging within flows and tasks for observability and debugging.

9

10

```python { .api }

11

def get_run_logger(name: str = None) -> logging.Logger:

12

"""

13

Get a logger for the current flow or task run.

14

15

Creates a logger that automatically includes run context information

16

such as flow run ID, task run ID, and other execution metadata.

17

18

Parameters:

19

- name: Custom logger name (defaults to current flow/task name)

20

21

Returns:

22

Logger instance configured for the current run context

23

24

The logger automatically includes:

25

- Run IDs for correlation

26

- Timestamps

27

- Context metadata

28

- Structured formatting for Prefect UI

29

"""

30

31

def get_logger(name: str = None) -> logging.Logger:

32

"""

33

Get a general-purpose Prefect logger.

34

35

Parameters:

36

- name: Logger name (defaults to calling module)

37

38

Returns:

39

Standard Python logger configured for Prefect

40

"""

41

42

def disable_run_logger() -> None:

43

"""

44

Disable run-specific logging for the current context.

45

46

Useful when you want to prevent automatic log capture

47

or when debugging logging issues.

48

"""

49

```

50

51

#### Usage Examples

52

53

```python

54

from prefect import flow, task, get_run_logger

55

from prefect.logging import get_logger, disable_run_logger

56

57

# Module-level logger

58

module_logger = get_logger(__name__)

59

60

@task

61

def process_data(data):

62

logger = get_run_logger()

63

64

logger.info(f"Processing {len(data)} records")

65

66

try:

67

result = complex_operation(data)

68

logger.info(f"Successfully processed {len(result)} records")

69

return result

70

except Exception as e:

71

logger.error(f"Processing failed: {e}")

72

raise

73

74

@flow

75

def data_pipeline():

76

logger = get_run_logger()

77

78

logger.info("Starting data pipeline")

79

80

# Log structured data

81

logger.info("Configuration", extra={

82

"batch_size": 1000,

83

"env": "production"

84

})

85

86

data = extract_data()

87

result = process_data(data)

88

89

logger.info(f"Pipeline completed successfully with {len(result)} records")

90

91

return result

92

93

# Disable logging in specific contexts

94

@task

95

def quiet_task():

96

disable_run_logger()

97

# This task won't generate run logs

98

return "completed quietly"

99

```

100

101

### Context Management

102

103

Context managers and utilities for managing execution context and metadata.

104

105

```python { .api }

106

def tags(*tags: str, **kwargs) -> ContextManager:

107

"""

108

Context manager for adding tags to flow and task runs.

109

110

Tags applied within this context are automatically added to

111

any flows or tasks that execute within the context.

112

113

Parameters:

114

- tags: Tag strings to apply

115

- **kwargs: Additional tag-related configuration

116

117

Returns:

118

Context manager that applies tags to nested executions

119

120

Usage:

121

with tags("production", "etl"):

122

# Any flows/tasks run here get these tags

123

my_flow()

124

"""

125

126

class FlowRunContext:

127

"""

128

Context object containing information about the current flow run.

129

130

Attributes:

131

- flow: The Flow object being executed

132

- flow_run: The FlowRun object for current execution

133

- parameters: Flow parameters

134

- task_runner: Task runner instance

135

- client: Prefect client for API access

136

- background_tasks: Background task set

137

"""

138

139

flow: Optional[Flow]

140

flow_run: Optional[FlowRun]

141

parameters: Dict[str, Any]

142

task_runner: Optional[TaskRunner]

143

client: Optional[PrefectClient]

144

background_tasks: Optional[Set[asyncio.Task]]

145

146

@classmethod

147

def get(cls) -> Optional["FlowRunContext"]:

148

"""Get the current flow run context."""

149

150

def __enter__(self) -> "FlowRunContext":

151

"""Enter the context."""

152

153

def __exit__(self, *args) -> None:

154

"""Exit the context."""

155

156

class TaskRunContext:

157

"""

158

Context object containing information about the current task run.

159

160

Attributes:

161

- task: The Task object being executed

162

- task_run: The TaskRun object for current execution

163

- parameters: Task parameters

164

- client: Prefect client for API access

165

"""

166

167

task: Optional[Task]

168

task_run: Optional[TaskRun]

169

parameters: Dict[str, Any]

170

client: Optional[PrefectClient]

171

172

@classmethod

173

def get(cls) -> Optional["TaskRunContext"]:

174

"""Get the current task run context."""

175

```

176

177

#### Usage Examples

178

179

```python

180

from prefect import flow, task, tags

181

from prefect.context import FlowRunContext, TaskRunContext

182

183

@task

184

def analyze_data():

185

# Access task context

186

context = TaskRunContext.get()

187

if context:

188

print(f"Task: {context.task.name}")

189

print(f"Task Run ID: {context.task_run.id}")

190

191

return "analysis complete"

192

193

@flow

194

def data_workflow():

195

# Access flow context

196

context = FlowRunContext.get()

197

if context:

198

print(f"Flow: {context.flow.name}")

199

print(f"Flow Run ID: {context.flow_run.id}")

200

201

# Use tags context manager

202

with tags("critical", "production"):

203

# These tasks inherit the tags

204

result1 = analyze_data()

205

result2 = analyze_data()

206

207

# These tasks don't have the tags

208

result3 = analyze_data()

209

210

return [result1, result2, result3]

211

212

# Nested tag contexts

213

@flow

214

def complex_workflow():

215

with tags("pipeline"):

216

with tags("extract"):

217

extract_result = extract_data()

218

219

with tags("transform"):

220

transform_result = transform_data(extract_result)

221

222

with tags("load"):

223

load_result = load_data(transform_result)

224

225

return load_result

226

```

227

228

### Execution Annotations

229

230

Annotations for controlling task execution behavior, particularly in mapping operations.

231

232

```python { .api }

233

class unmapped:

234

"""

235

Annotation to mark inputs as unmapped in mapping operations.

236

237

When using task.map(), wrap arguments with unmapped() to indicate

238

they should not be mapped over but used as-is for all mapped calls.

239

"""

240

241

def __init__(self, value: Any):

242

"""

243

Initialize unmapped annotation.

244

245

Parameters:

246

- value: The value to keep unmapped

247

"""

248

249

class allow_failure:

250

"""

251

Annotation to allow failed task results to flow downstream.

252

253

Normally, if a task fails, dependent tasks don't run. Wrapping

254

a task call with allow_failure() allows downstream tasks to

255

receive the failed state and handle it gracefully.

256

"""

257

258

def __init__(self, value: Any):

259

"""

260

Initialize allow_failure annotation.

261

262

Parameters:

263

- value: The task call or value to allow failure for

264

"""

265

266

class quote:

267

"""

268

Annotation to prevent expression evaluation in task parameters.

269

270

Use quote() to pass expressions as literal values rather than

271

evaluating them before passing to tasks.

272

"""

273

274

def __init__(self, expr: Any):

275

"""

276

Initialize quote annotation.

277

278

Parameters:

279

- expr: Expression to quote

280

"""

281

282

# Backward compatibility alias

283

Quote = quote

284

```

285

286

#### Usage Examples

287

288

```python

289

from prefect import flow, task

290

from prefect.utilities.annotations import unmapped, allow_failure, quote

291

292

@task

293

def process_item(item, config, multiplier):

294

return item * config["factor"] * multiplier

295

296

@task

297

def handle_result(result):

298

if isinstance(result, Exception):

299

return f"Error: {result}"

300

return f"Success: {result}"

301

302

@flow

303

def mapping_example():

304

items = [1, 2, 3, 4, 5]

305

config = {"factor": 10} # Shared configuration

306

multiplier = 2 # Another shared value

307

308

# Map over items, but config and multiplier are unmapped

309

results = process_item.map(

310

items,

311

unmapped(config), # Same config for all

312

unmapped(multiplier) # Same multiplier for all

313

)

314

315

return results

316

317

@task

318

def risky_task(value):

319

if value < 0:

320

raise ValueError("Negative value")

321

return value * 2

322

323

@flow

324

def failure_handling_example():

325

values = [-1, 2, 3, -4, 5]

326

327

# Allow failures to propagate downstream

328

results = risky_task.map(allow_failure(values))

329

330

# Handle both successful and failed results

331

handled = handle_result.map(results)

332

333

return handled

334

335

@task

336

def expression_task(expr_string):

337

# Receive the quoted expression as a string

338

return f"Expression: {expr_string}"

339

340

@flow

341

def quote_example():

342

x = 10

343

y = 20

344

345

# Without quote, x + y is evaluated to 30

346

result1 = expression_task(x + y)

347

348

# With quote, "x + y" is passed as a string

349

result2 = expression_task(quote("x + y"))

350

351

return result1, result2

352

```

353

354

### Transaction Management

355

356

Transaction context for ensuring data consistency across task executions.

357

358

```python { .api }

359

class Transaction:

360

"""

361

Transaction context manager for coordinating task execution.

362

363

Provides isolation and coordination mechanisms for tasks that

364

need to execute as a unit with rollback capabilities.

365

"""

366

367

def __init__(

368

self,

369

key: Optional[str] = None,

370

timeout: Optional[float] = None,

371

):

372

"""

373

Initialize a transaction context.

374

375

Parameters:

376

- key: Unique identifier for the transaction

377

- timeout: Maximum time to hold the transaction (seconds)

378

"""

379

380

def __enter__(self) -> "Transaction":

381

"""

382

Enter the transaction context.

383

384

Returns:

385

Transaction instance for use in context

386

"""

387

388

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

389

"""

390

Exit the transaction context.

391

392

Parameters:

393

- exc_type: Exception type if an error occurred

394

- exc_val: Exception value if an error occurred

395

- exc_tb: Exception traceback if an error occurred

396

397

If an exception occurred, the transaction is rolled back.

398

Otherwise, it is committed.

399

"""

400

401

def commit(self) -> None:

402

"""Commit the transaction."""

403

404

def rollback(self) -> None:

405

"""Roll back the transaction."""

406

407

@property

408

def is_committed(self) -> bool:

409

"""Check if transaction has been committed."""

410

411

@property

412

def is_rolled_back(self) -> bool:

413

"""Check if transaction has been rolled back."""

414

```

415

416

#### Usage Examples

417

418

```python

419

from prefect import flow, task

420

from prefect.transactions import Transaction

421

422

@task

423

def update_database(data):

424

# Database update logic

425

print(f"Updating database with {data}")

426

return f"Updated: {data}"

427

428

@task

429

def send_notification(message):

430

# Notification logic

431

print(f"Sending notification: {message}")

432

return f"Sent: {message}"

433

434

@task

435

def log_audit(action):

436

# Audit logging

437

print(f"Audit log: {action}")

438

return f"Logged: {action}"

439

440

@flow

441

def transactional_workflow(data):

442

try:

443

with Transaction(key="data-update", timeout=300) as txn:

444

# All tasks in this context are part of the transaction

445

update_result = update_database(data)

446

notification_result = send_notification(f"Data updated: {data}")

447

audit_result = log_audit(f"Updated data for {data}")

448

449

# If any task fails, the entire transaction rolls back

450

return {

451

"update": update_result,

452

"notification": notification_result,

453

"audit": audit_result,

454

"transaction_id": txn.key

455

}

456

except Exception as e:

457

# Handle transaction failure

458

return {"error": str(e), "status": "rolled_back"}

459

460

# Manual transaction control

461

@flow

462

def manual_transaction_example():

463

txn = Transaction(key="manual-txn")

464

465

try:

466

txn.__enter__()

467

468

# Do work

469

result1 = update_database("batch1")

470

result2 = update_database("batch2")

471

472

# Explicit commit

473

txn.commit()

474

475

return {"results": [result1, result2], "status": "committed"}

476

477

except Exception as e:

478

# Explicit rollback

479

txn.rollback()

480

return {"error": str(e), "status": "rolled_back"}

481

482

finally:

483

if not (txn.is_committed or txn.is_rolled_back):

484

txn.__exit__(None, None, None)

485

```

486

487

### Context Serialization

488

489

Utilities for serializing and managing context across process boundaries.

490

491

```python { .api }

492

def serialize_context() -> Dict[str, Any]:

493

"""

494

Serialize the current Prefect context for cross-process communication.

495

496

Returns:

497

Dictionary containing serialized context information including:

498

- Flow run context

499

- Task run context

500

- Settings context

501

- Tag context

502

"""

503

504

def hydrated_context(context_data: Dict[str, Any]) -> ContextManager:

505

"""

506

Context manager that restores serialized context.

507

508

Parameters:

509

- context_data: Serialized context from serialize_context()

510

511

Returns:

512

Context manager that applies the hydrated context

513

"""

514

```

515

516

#### Usage Examples

517

518

```python

519

from prefect.context import serialize_context, hydrated_context

520

from prefect import flow, task

521

import json

522

523

@flow

524

def parent_flow():

525

# Serialize current context

526

context_data = serialize_context()

527

528

# Could save to file, send to another process, etc.

529

context_json = json.dumps(context_data, default=str)

530

531

# Later, restore the context

532

restored_data = json.loads(context_json)

533

534

with hydrated_context(restored_data):

535

# This runs with the restored context

536

child_task()

537

538

@task

539

def child_task():

540

# This task has access to the restored context

541

context = TaskRunContext.get()

542

if context:

543

print(f"Restored task context: {context.task_run.id}")

544

```

545

546

### Input Management

547

548

Flow run input/output management for interactive workflows and human-in-the-loop processes.

549

550

```python { .api }

551

from prefect.input import (

552

RunInput,

553

RunInputMetadata,

554

Keyset,

555

GetInputHandler,

556

send_input,

557

receive_input,

558

create_flow_run_input,

559

read_flow_run_input,

560

delete_flow_run_input,

561

)

562

563

async def send_input(

564

run_input: Any,

565

flow_run_id: UUID,

566

sender: Optional[str] = None,

567

key_prefix: Optional[str] = None,

568

) -> None:

569

"""

570

Send input to a flow run for interactive workflows.

571

572

Parameters:

573

- run_input: Input data to send (JSON-serializable)

574

- flow_run_id: ID of target flow run

575

- sender: Optional identifier of sender

576

- key_prefix: Optional prefix for input keys

577

"""

578

579

def receive_input(

580

input_type: Type[T],

581

timeout: Optional[float] = 3600,

582

poll_interval: float = 10,

583

raise_timeout_error: bool = False,

584

exclude_keys: Optional[Set[str]] = None,

585

key_prefix: Optional[str] = None,

586

flow_run_id: Optional[UUID] = None,

587

with_metadata: bool = False,

588

) -> GetInputHandler[T]:

589

"""

590

Receive input for the current flow run.

591

592

Parameters:

593

- input_type: Type of input to receive

594

- timeout: Maximum wait time in seconds

595

- poll_interval: Polling interval in seconds

596

- raise_timeout_error: Whether to raise error on timeout

597

- exclude_keys: Keys to exclude from input

598

- key_prefix: Prefix for input keys

599

- flow_run_id: Specific flow run ID (defaults to current)

600

- with_metadata: Whether to include metadata

601

602

Returns:

603

Input handler for receiving typed input

604

"""

605

606

def create_flow_run_input(

607

flow_run_id: UUID,

608

key: str,

609

value: Any,

610

sender: Optional[str] = None,

611

) -> RunInput:

612

"""

613

Create flow run input for manual input management.

614

615

Parameters:

616

- flow_run_id: Target flow run ID

617

- key: Input key identifier

618

- value: Input value (JSON-serializable)

619

- sender: Optional sender identifier

620

621

Returns:

622

Created RunInput object

623

"""

624

625

class RunInput(BaseModel):

626

"""Flow run input data container."""

627

flow_run_id: UUID

628

key: str

629

value: Any

630

sender: Optional[str]

631

created: datetime

632

633

class RunInputMetadata(BaseModel):

634

"""Metadata for flow run inputs."""

635

key: str

636

sender: Optional[str]

637

created: datetime

638

639

class Keyset:

640

"""Set of input keys for input management."""

641

642

def __init__(self, keys: Set[str]):

643

self.keys = keys

644

645

def __contains__(self, key: str) -> bool:

646

"""Check if key is in keyset."""

647

648

class GetInputHandler(Generic[T]):

649

"""Handler for receiving typed input."""

650

651

def get(self, key: str, default: T = None) -> T:

652

"""Get input by key."""

653

654

def __getitem__(self, key: str) -> T:

655

"""Get input by key (dict-like access)."""

656

657

def keys(self) -> Set[str]:

658

"""Get available input keys."""

659

```

660

661

#### Usage Examples

662

663

```python

664

from prefect import flow, task

665

from prefect.input import send_input, receive_input

666

from prefect.states import Paused

667

import asyncio

668

669

# Interactive workflow with human input

670

@flow

671

def approval_workflow(document_id: str):

672

"""Workflow requiring human approval."""

673

# Process document

674

processed_doc = process_document(document_id)

675

676

# Pause for human review

677

approval_input = receive_input(

678

input_type=dict,

679

timeout=3600, # 1 hour timeout

680

key_prefix="approval"

681

)

682

683

# Wait for approval input

684

approval = approval_input.get("decision")

685

comments = approval_input.get("comments", "")

686

687

if approval == "approved":

688

return finalize_document(processed_doc, comments)

689

else:

690

return reject_document(processed_doc, comments)

691

692

# Sending input to paused flow

693

async def send_approval():

694

"""Send approval input to paused flow."""

695

flow_run_id = UUID("...") # Get from flow run

696

697

await send_input(

698

run_input={

699

"decision": "approved",

700

"comments": "Looks good, approved for publication"

701

},

702

flow_run_id=flow_run_id,

703

sender="manager@company.com",

704

key_prefix="approval"

705

)

706

707

# Multi-step input collection

708

@flow

709

def data_collection_workflow():

710

"""Collect multiple inputs over time."""

711

712

# Step 1: Initial parameters

713

config_input = receive_input(

714

input_type=dict,

715

key_prefix="config",

716

timeout=1800 # 30 minutes

717

)

718

719

config = config_input.get("parameters")

720

721

# Step 2: Data processing with config

722

results = process_with_config(config)

723

724

# Step 3: Review results and get feedback

725

feedback_input = receive_input(

726

input_type=dict,

727

key_prefix="feedback",

728

timeout=3600 # 1 hour

729

)

730

731

feedback = feedback_input.get("review")

732

733

if feedback.get("needs_revision"):

734

# Process feedback and revise

735

return revise_results(results, feedback)

736

else:

737

return finalize_results(results)

738

739

@task

740

def process_document(doc_id: str):

741

return {"id": doc_id, "status": "processed"}

742

743

@task

744

def finalize_document(doc: dict, comments: str):

745

return {"id": doc["id"], "status": "finalized", "comments": comments}

746

747

@task

748

def reject_document(doc: dict, reason: str):

749

return {"id": doc["id"], "status": "rejected", "reason": reason}

750

```

751

752

## Types

753

754

Types related to context and utilities:

755

756

```python { .api }

757

from typing import Any, Dict, List, Optional, Set, ContextManager

758

from uuid import UUID

759

import asyncio

760

import logging

761

762

class ContextModel:

763

"""Base model for context data objects."""

764

765

@classmethod

766

def get(cls) -> Optional["ContextModel"]:

767

"""Get current context instance."""

768

769

class TagsContext(ContextModel):

770

"""Context for managing tags."""

771

current_tags: Set[str]

772

773

def add_tags(self, *tags: str) -> None:

774

"""Add tags to current context."""

775

776

def remove_tags(self, *tags: str) -> None:

777

"""Remove tags from current context."""

778

779

class SettingsContext(ContextModel):

780

"""Context for Prefect settings."""

781

profile: Profile

782

settings: Dict[str, Any]

783

784

# Annotation base class

785

class BaseAnnotation:

786

"""Base class for execution annotations."""

787

788

def __init__(self, value: Any):

789

self.value = value

790

791

def unwrap(self) -> Any:

792

"""Unwrap the annotated value."""

793

return self.value

794

795

# Log eavesdropper for capturing logs

796

class LogEavesdropper:

797

"""Utility for capturing and managing log output."""

798

799

def __init__(self, logger: logging.Logger):

800

"""Initialize log eavesdropper."""

801

802

def __enter__(self) -> "LogEavesdropper":

803

"""Start capturing logs."""

804

805

def __exit__(self, *args) -> None:

806

"""Stop capturing logs."""

807

808

def get_logs(self) -> List[logging.LogRecord]:

809

"""Get captured log records."""

810

```