0
# Context & Utilities
1
2
Prefect's context and utility systems provide runtime information, logging capabilities, execution annotations, and transaction management. These components enable advanced workflow control, debugging, and data consistency across distributed execution environments.
3
4
## Capabilities
5
6
### Logging
7
8
Access to structured logging within flows and tasks for observability and debugging.
9
10
```python { .api }
11
def get_run_logger(name: str = None) -> logging.Logger:
12
"""
13
Get a logger for the current flow or task run.
14
15
Creates a logger that automatically includes run context information
16
such as flow run ID, task run ID, and other execution metadata.
17
18
Parameters:
19
- name: Custom logger name (defaults to current flow/task name)
20
21
Returns:
22
Logger instance configured for the current run context
23
24
The logger automatically includes:
25
- Run IDs for correlation
26
- Timestamps
27
- Context metadata
28
- Structured formatting for Prefect UI
29
"""
30
31
def get_logger(name: str = None) -> logging.Logger:
32
"""
33
Get a general-purpose Prefect logger.
34
35
Parameters:
36
- name: Logger name (defaults to calling module)
37
38
Returns:
39
Standard Python logger configured for Prefect
40
"""
41
42
def disable_run_logger() -> None:
43
"""
44
Disable run-specific logging for the current context.
45
46
Useful when you want to prevent automatic log capture
47
or when debugging logging issues.
48
"""
49
```
50
51
#### Usage Examples
52
53
```python
54
from prefect import flow, task, get_run_logger
55
from prefect.logging import get_logger, disable_run_logger
56
57
# Module-level logger
58
module_logger = get_logger(__name__)
59
60
@task
61
def process_data(data):
62
logger = get_run_logger()
63
64
logger.info(f"Processing {len(data)} records")
65
66
try:
67
result = complex_operation(data)
68
logger.info(f"Successfully processed {len(result)} records")
69
return result
70
except Exception as e:
71
logger.error(f"Processing failed: {e}")
72
raise
73
74
@flow
75
def data_pipeline():
76
logger = get_run_logger()
77
78
logger.info("Starting data pipeline")
79
80
# Log structured data
81
logger.info("Configuration", extra={
82
"batch_size": 1000,
83
"env": "production"
84
})
85
86
data = extract_data()
87
result = process_data(data)
88
89
logger.info(f"Pipeline completed successfully with {len(result)} records")
90
91
return result
92
93
# Disable logging in specific contexts
94
@task
95
def quiet_task():
96
disable_run_logger()
97
# This task won't generate run logs
98
return "completed quietly"
99
```
100
101
### Context Management
102
103
Context managers and utilities for managing execution context and metadata.
104
105
```python { .api }
106
def tags(*tags: str, **kwargs) -> ContextManager:
107
"""
108
Context manager for adding tags to flow and task runs.
109
110
Tags applied within this context are automatically added to
111
any flows or tasks that execute within the context.
112
113
Parameters:
114
- tags: Tag strings to apply
115
- **kwargs: Additional tag-related configuration
116
117
Returns:
118
Context manager that applies tags to nested executions
119
120
Usage:
121
with tags("production", "etl"):
122
# Any flows/tasks run here get these tags
123
my_flow()
124
"""
125
126
class FlowRunContext:
127
"""
128
Context object containing information about the current flow run.
129
130
Attributes:
131
- flow: The Flow object being executed
132
- flow_run: The FlowRun object for current execution
133
- parameters: Flow parameters
134
- task_runner: Task runner instance
135
- client: Prefect client for API access
136
- background_tasks: Background task set
137
"""
138
139
flow: Optional[Flow]
140
flow_run: Optional[FlowRun]
141
parameters: Dict[str, Any]
142
task_runner: Optional[TaskRunner]
143
client: Optional[PrefectClient]
144
background_tasks: Optional[Set[asyncio.Task]]
145
146
@classmethod
147
def get(cls) -> Optional["FlowRunContext"]:
148
"""Get the current flow run context."""
149
150
def __enter__(self) -> "FlowRunContext":
151
"""Enter the context."""
152
153
def __exit__(self, *args) -> None:
154
"""Exit the context."""
155
156
class TaskRunContext:
157
"""
158
Context object containing information about the current task run.
159
160
Attributes:
161
- task: The Task object being executed
162
- task_run: The TaskRun object for current execution
163
- parameters: Task parameters
164
- client: Prefect client for API access
165
"""
166
167
task: Optional[Task]
168
task_run: Optional[TaskRun]
169
parameters: Dict[str, Any]
170
client: Optional[PrefectClient]
171
172
@classmethod
173
def get(cls) -> Optional["TaskRunContext"]:
174
"""Get the current task run context."""
175
```
176
177
#### Usage Examples
178
179
```python
180
from prefect import flow, task, tags
181
from prefect.context import FlowRunContext, TaskRunContext
182
183
@task
184
def analyze_data():
185
# Access task context
186
context = TaskRunContext.get()
187
if context:
188
print(f"Task: {context.task.name}")
189
print(f"Task Run ID: {context.task_run.id}")
190
191
return "analysis complete"
192
193
@flow
194
def data_workflow():
195
# Access flow context
196
context = FlowRunContext.get()
197
if context:
198
print(f"Flow: {context.flow.name}")
199
print(f"Flow Run ID: {context.flow_run.id}")
200
201
# Use tags context manager
202
with tags("critical", "production"):
203
# These tasks inherit the tags
204
result1 = analyze_data()
205
result2 = analyze_data()
206
207
# These tasks don't have the tags
208
result3 = analyze_data()
209
210
return [result1, result2, result3]
211
212
# Nested tag contexts
213
@flow
214
def complex_workflow():
215
with tags("pipeline"):
216
with tags("extract"):
217
extract_result = extract_data()
218
219
with tags("transform"):
220
transform_result = transform_data(extract_result)
221
222
with tags("load"):
223
load_result = load_data(transform_result)
224
225
return load_result
226
```
227
228
### Execution Annotations
229
230
Annotations for controlling task execution behavior, particularly in mapping operations.
231
232
```python { .api }
233
class unmapped:
234
"""
235
Annotation to mark inputs as unmapped in mapping operations.
236
237
When using task.map(), wrap arguments with unmapped() to indicate
238
they should not be mapped over but used as-is for all mapped calls.
239
"""
240
241
def __init__(self, value: Any):
242
"""
243
Initialize unmapped annotation.
244
245
Parameters:
246
- value: The value to keep unmapped
247
"""
248
249
class allow_failure:
250
"""
251
Annotation to allow failed task results to flow downstream.
252
253
Normally, if a task fails, dependent tasks don't run. Wrapping
254
a task call with allow_failure() allows downstream tasks to
255
receive the failed state and handle it gracefully.
256
"""
257
258
def __init__(self, value: Any):
259
"""
260
Initialize allow_failure annotation.
261
262
Parameters:
263
- value: The task call or value to allow failure for
264
"""
265
266
class quote:
267
"""
268
Annotation to prevent expression evaluation in task parameters.
269
270
Use quote() to pass expressions as literal values rather than
271
evaluating them before passing to tasks.
272
"""
273
274
def __init__(self, expr: Any):
275
"""
276
Initialize quote annotation.
277
278
Parameters:
279
- expr: Expression to quote
280
"""
281
282
# Backward compatibility alias
283
Quote = quote
284
```
285
286
#### Usage Examples
287
288
```python
289
from prefect import flow, task
290
from prefect.utilities.annotations import unmapped, allow_failure, quote
291
292
@task
293
def process_item(item, config, multiplier):
294
return item * config["factor"] * multiplier
295
296
@task
297
def handle_result(result):
298
if isinstance(result, Exception):
299
return f"Error: {result}"
300
return f"Success: {result}"
301
302
@flow
303
def mapping_example():
304
items = [1, 2, 3, 4, 5]
305
config = {"factor": 10} # Shared configuration
306
multiplier = 2 # Another shared value
307
308
# Map over items, but config and multiplier are unmapped
309
results = process_item.map(
310
items,
311
unmapped(config), # Same config for all
312
unmapped(multiplier) # Same multiplier for all
313
)
314
315
return results
316
317
@task
318
def risky_task(value):
319
if value < 0:
320
raise ValueError("Negative value")
321
return value * 2
322
323
@flow
324
def failure_handling_example():
325
values = [-1, 2, 3, -4, 5]
326
327
# Allow failures to propagate downstream
328
results = risky_task.map(allow_failure(values))
329
330
# Handle both successful and failed results
331
handled = handle_result.map(results)
332
333
return handled
334
335
@task
336
def expression_task(expr_string):
337
# Receive the quoted expression as a string
338
return f"Expression: {expr_string}"
339
340
@flow
341
def quote_example():
342
x = 10
343
y = 20
344
345
# Without quote, x + y is evaluated to 30
346
result1 = expression_task(x + y)
347
348
# With quote, "x + y" is passed as a string
349
result2 = expression_task(quote("x + y"))
350
351
return result1, result2
352
```
353
354
### Transaction Management
355
356
Transaction context for ensuring data consistency across task executions.
357
358
```python { .api }
359
class Transaction:
360
"""
361
Transaction context manager for coordinating task execution.
362
363
Provides isolation and coordination mechanisms for tasks that
364
need to execute as a unit with rollback capabilities.
365
"""
366
367
def __init__(
368
self,
369
key: Optional[str] = None,
370
timeout: Optional[float] = None,
371
):
372
"""
373
Initialize a transaction context.
374
375
Parameters:
376
- key: Unique identifier for the transaction
377
- timeout: Maximum time to hold the transaction (seconds)
378
"""
379
380
def __enter__(self) -> "Transaction":
381
"""
382
Enter the transaction context.
383
384
Returns:
385
Transaction instance for use in context
386
"""
387
388
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
389
"""
390
Exit the transaction context.
391
392
Parameters:
393
- exc_type: Exception type if an error occurred
394
- exc_val: Exception value if an error occurred
395
- exc_tb: Exception traceback if an error occurred
396
397
If an exception occurred, the transaction is rolled back.
398
Otherwise, it is committed.
399
"""
400
401
def commit(self) -> None:
402
"""Commit the transaction."""
403
404
def rollback(self) -> None:
405
"""Roll back the transaction."""
406
407
@property
408
def is_committed(self) -> bool:
409
"""Check if transaction has been committed."""
410
411
@property
412
def is_rolled_back(self) -> bool:
413
"""Check if transaction has been rolled back."""
414
```
415
416
#### Usage Examples
417
418
```python
419
from prefect import flow, task
420
from prefect.transactions import Transaction
421
422
@task
423
def update_database(data):
424
# Database update logic
425
print(f"Updating database with {data}")
426
return f"Updated: {data}"
427
428
@task
429
def send_notification(message):
430
# Notification logic
431
print(f"Sending notification: {message}")
432
return f"Sent: {message}"
433
434
@task
435
def log_audit(action):
436
# Audit logging
437
print(f"Audit log: {action}")
438
return f"Logged: {action}"
439
440
@flow
441
def transactional_workflow(data):
442
try:
443
with Transaction(key="data-update", timeout=300) as txn:
444
# All tasks in this context are part of the transaction
445
update_result = update_database(data)
446
notification_result = send_notification(f"Data updated: {data}")
447
audit_result = log_audit(f"Updated data for {data}")
448
449
# If any task fails, the entire transaction rolls back
450
return {
451
"update": update_result,
452
"notification": notification_result,
453
"audit": audit_result,
454
"transaction_id": txn.key
455
}
456
except Exception as e:
457
# Handle transaction failure
458
return {"error": str(e), "status": "rolled_back"}
459
460
# Manual transaction control
461
@flow
462
def manual_transaction_example():
463
txn = Transaction(key="manual-txn")
464
465
try:
466
txn.__enter__()
467
468
# Do work
469
result1 = update_database("batch1")
470
result2 = update_database("batch2")
471
472
# Explicit commit
473
txn.commit()
474
475
return {"results": [result1, result2], "status": "committed"}
476
477
except Exception as e:
478
# Explicit rollback
479
txn.rollback()
480
return {"error": str(e), "status": "rolled_back"}
481
482
finally:
483
if not (txn.is_committed or txn.is_rolled_back):
484
txn.__exit__(None, None, None)
485
```
486
487
### Context Serialization
488
489
Utilities for serializing and managing context across process boundaries.
490
491
```python { .api }
492
def serialize_context() -> Dict[str, Any]:
493
"""
494
Serialize the current Prefect context for cross-process communication.
495
496
Returns:
497
Dictionary containing serialized context information including:
498
- Flow run context
499
- Task run context
500
- Settings context
501
- Tag context
502
"""
503
504
def hydrated_context(context_data: Dict[str, Any]) -> ContextManager:
505
"""
506
Context manager that restores serialized context.
507
508
Parameters:
509
- context_data: Serialized context from serialize_context()
510
511
Returns:
512
Context manager that applies the hydrated context
513
"""
514
```
515
516
#### Usage Examples
517
518
```python
519
from prefect.context import serialize_context, hydrated_context
520
from prefect import flow, task
521
import json
522
523
@flow
524
def parent_flow():
525
# Serialize current context
526
context_data = serialize_context()
527
528
# Could save to file, send to another process, etc.
529
context_json = json.dumps(context_data, default=str)
530
531
# Later, restore the context
532
restored_data = json.loads(context_json)
533
534
with hydrated_context(restored_data):
535
# This runs with the restored context
536
child_task()
537
538
@task
539
def child_task():
540
# This task has access to the restored context
541
context = TaskRunContext.get()
542
if context:
543
print(f"Restored task context: {context.task_run.id}")
544
```
545
546
### Input Management
547
548
Flow run input/output management for interactive workflows and human-in-the-loop processes.
549
550
```python { .api }
551
from prefect.input import (
552
RunInput,
553
RunInputMetadata,
554
Keyset,
555
GetInputHandler,
556
send_input,
557
receive_input,
558
create_flow_run_input,
559
read_flow_run_input,
560
delete_flow_run_input,
561
)
562
563
async def send_input(
564
run_input: Any,
565
flow_run_id: UUID,
566
sender: Optional[str] = None,
567
key_prefix: Optional[str] = None,
568
) -> None:
569
"""
570
Send input to a flow run for interactive workflows.
571
572
Parameters:
573
- run_input: Input data to send (JSON-serializable)
574
- flow_run_id: ID of target flow run
575
- sender: Optional identifier of sender
576
- key_prefix: Optional prefix for input keys
577
"""
578
579
def receive_input(
580
input_type: Type[T],
581
timeout: Optional[float] = 3600,
582
poll_interval: float = 10,
583
raise_timeout_error: bool = False,
584
exclude_keys: Optional[Set[str]] = None,
585
key_prefix: Optional[str] = None,
586
flow_run_id: Optional[UUID] = None,
587
with_metadata: bool = False,
588
) -> GetInputHandler[T]:
589
"""
590
Receive input for the current flow run.
591
592
Parameters:
593
- input_type: Type of input to receive
594
- timeout: Maximum wait time in seconds
595
- poll_interval: Polling interval in seconds
596
- raise_timeout_error: Whether to raise error on timeout
597
- exclude_keys: Keys to exclude from input
598
- key_prefix: Prefix for input keys
599
- flow_run_id: Specific flow run ID (defaults to current)
600
- with_metadata: Whether to include metadata
601
602
Returns:
603
Input handler for receiving typed input
604
"""
605
606
def create_flow_run_input(
607
flow_run_id: UUID,
608
key: str,
609
value: Any,
610
sender: Optional[str] = None,
611
) -> RunInput:
612
"""
613
Create flow run input for manual input management.
614
615
Parameters:
616
- flow_run_id: Target flow run ID
617
- key: Input key identifier
618
- value: Input value (JSON-serializable)
619
- sender: Optional sender identifier
620
621
Returns:
622
Created RunInput object
623
"""
624
625
class RunInput(BaseModel):
626
"""Flow run input data container."""
627
flow_run_id: UUID
628
key: str
629
value: Any
630
sender: Optional[str]
631
created: datetime
632
633
class RunInputMetadata(BaseModel):
634
"""Metadata for flow run inputs."""
635
key: str
636
sender: Optional[str]
637
created: datetime
638
639
class Keyset:
640
"""Set of input keys for input management."""
641
642
def __init__(self, keys: Set[str]):
643
self.keys = keys
644
645
def __contains__(self, key: str) -> bool:
646
"""Check if key is in keyset."""
647
648
class GetInputHandler(Generic[T]):
649
"""Handler for receiving typed input."""
650
651
def get(self, key: str, default: T = None) -> T:
652
"""Get input by key."""
653
654
def __getitem__(self, key: str) -> T:
655
"""Get input by key (dict-like access)."""
656
657
def keys(self) -> Set[str]:
658
"""Get available input keys."""
659
```
660
661
#### Usage Examples
662
663
```python
664
from prefect import flow, task
665
from prefect.input import send_input, receive_input
666
from prefect.states import Paused
667
import asyncio
668
669
# Interactive workflow with human input
670
@flow
671
def approval_workflow(document_id: str):
672
"""Workflow requiring human approval."""
673
# Process document
674
processed_doc = process_document(document_id)
675
676
# Pause for human review
677
approval_input = receive_input(
678
input_type=dict,
679
timeout=3600, # 1 hour timeout
680
key_prefix="approval"
681
)
682
683
# Wait for approval input
684
approval = approval_input.get("decision")
685
comments = approval_input.get("comments", "")
686
687
if approval == "approved":
688
return finalize_document(processed_doc, comments)
689
else:
690
return reject_document(processed_doc, comments)
691
692
# Sending input to paused flow
693
async def send_approval():
694
"""Send approval input to paused flow."""
695
flow_run_id = UUID("...") # Get from flow run
696
697
await send_input(
698
run_input={
699
"decision": "approved",
700
"comments": "Looks good, approved for publication"
701
},
702
flow_run_id=flow_run_id,
703
sender="manager@company.com",
704
key_prefix="approval"
705
)
706
707
# Multi-step input collection
708
@flow
709
def data_collection_workflow():
710
"""Collect multiple inputs over time."""
711
712
# Step 1: Initial parameters
713
config_input = receive_input(
714
input_type=dict,
715
key_prefix="config",
716
timeout=1800 # 30 minutes
717
)
718
719
config = config_input.get("parameters")
720
721
# Step 2: Data processing with config
722
results = process_with_config(config)
723
724
# Step 3: Review results and get feedback
725
feedback_input = receive_input(
726
input_type=dict,
727
key_prefix="feedback",
728
timeout=3600 # 1 hour
729
)
730
731
feedback = feedback_input.get("review")
732
733
if feedback.get("needs_revision"):
734
# Process feedback and revise
735
return revise_results(results, feedback)
736
else:
737
return finalize_results(results)
738
739
@task
740
def process_document(doc_id: str):
741
return {"id": doc_id, "status": "processed"}
742
743
@task
744
def finalize_document(doc: dict, comments: str):
745
return {"id": doc["id"], "status": "finalized", "comments": comments}
746
747
@task
748
def reject_document(doc: dict, reason: str):
749
return {"id": doc["id"], "status": "rejected", "reason": reason}
750
```
751
752
## Types
753
754
Types related to context and utilities:
755
756
```python { .api }
757
from typing import Any, Dict, List, Optional, Set, ContextManager
758
from uuid import UUID
759
import asyncio
760
import logging
761
762
class ContextModel:
763
"""Base model for context data objects."""
764
765
@classmethod
766
def get(cls) -> Optional["ContextModel"]:
767
"""Get current context instance."""
768
769
class TagsContext(ContextModel):
770
"""Context for managing tags."""
771
current_tags: Set[str]
772
773
def add_tags(self, *tags: str) -> None:
774
"""Add tags to current context."""
775
776
def remove_tags(self, *tags: str) -> None:
777
"""Remove tags from current context."""
778
779
class SettingsContext(ContextModel):
780
"""Context for Prefect settings."""
781
profile: Profile
782
settings: Dict[str, Any]
783
784
# Annotation base class
785
class BaseAnnotation:
786
"""Base class for execution annotations."""
787
788
def __init__(self, value: Any):
789
self.value = value
790
791
def unwrap(self) -> Any:
792
"""Unwrap the annotated value."""
793
return self.value
794
795
# Log eavesdropper for capturing logs
796
class LogEavesdropper:
797
"""Utility for capturing and managing log output."""
798
799
def __init__(self, logger: logging.Logger):
800
"""Initialize log eavesdropper."""
801
802
def __enter__(self) -> "LogEavesdropper":
803
"""Start capturing logs."""
804
805
def __exit__(self, *args) -> None:
806
"""Stop capturing logs."""
807
808
def get_logs(self) -> List[logging.LogRecord]:
809
"""Get captured log records."""
810
```