Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect provides runtime context modules that allow flows and tasks to access information about their current execution environment. These modules provide dynamic access to deployment parameters, flow run metadata, and task run details during execution.
Access current deployment information including parameters, ID, and version during flow execution.
import prefect.runtime.deployment# Available attributes on prefect.runtime.deployment:
id: str # The deployment's unique ID
name: str # The deployment's name
version: str # The deployment's version
flow_run_id: str # The current flow run ID for this deployment
parameters: Dict[str, Any] # Parameters passed to this deployment runUsage Example:
from prefect import flow
import prefect.runtime.deployment
@flow
def my_flow():
# Access deployment information
deployment_id = prefect.runtime.deployment.id
params = prefect.runtime.deployment.parameters
version = prefect.runtime.deployment.version
print(f"Running deployment {deployment_id} version {version}")
print(f"Parameters: {params}")Access current flow run information including metadata, parameters, and execution details.
import prefect.runtime.flow_run# Available attributes on prefect.runtime.flow_run:
id: str # The flow run's unique ID
tags: List[str] # The flow run's set of tags
scheduled_start_time: datetime # Expected scheduled start time
name: str # Name of the flow run
flow_name: str # Name of the flow
flow_version: str # Version of the flow
parameters: Dict[str, Any] # Parameters passed to this run
parent_flow_run_id: Optional[str] # ID of parent flow run (if subflow)
parent_deployment_id: Optional[str] # ID of parent deployment (if any)
root_flow_run_id: str # ID of root flow run in hierarchy
run_count: int # Number of times this flow run has been executedUsage Example:
from prefect import flow
import prefect.runtime.flow_run
@flow
def my_flow():
# Access flow run context
flow_run_id = prefect.runtime.flow_run.id
tags = prefect.runtime.flow_run.tags
parameters = prefect.runtime.flow_run.parameters
run_count = prefect.runtime.flow_run.run_count
print(f"Flow run {flow_run_id} (attempt {run_count})")
print(f"Tags: {tags}, Parameters: {parameters}")Access current task run information and metadata during task execution.
import prefect.runtime.task_run# Available attributes on prefect.runtime.task_run:
id: str # The task run's unique ID
name: str # Name of the task run
task_key: str # Unique key identifying the task
flow_run_id: str # ID of the parent flow run
task_inputs: Dict[str, Any] # Input parameters for the task
run_count: int # Number of times this task run has been executedUsage Example:
from prefect import task
import prefect.runtime.task_run
@task
def my_task():
# Access task run context
task_run_id = prefect.runtime.task_run.id
task_name = prefect.runtime.task_run.name
task_key = prefect.runtime.task_run.task_key
run_count = prefect.runtime.task_run.run_count
print(f"Task {task_name} ({task_key}) run {run_count}")
return f"Executed task run {task_run_id}"from prefect import flow, task
import prefect.runtime.deployment
import prefect.runtime.flow_run
@task
def process_data(data_source: str):
# Use deployment parameters to configure processing
config = prefect.runtime.deployment.parameters.get("processing_config", {})
if "debug" in prefect.runtime.flow_run.tags:
print(f"Debug mode: processing {data_source} with config {config}")
return f"Processed {data_source}"
@flow
def data_pipeline():
# Access runtime context for dynamic behavior
env = prefect.runtime.deployment.parameters.get("environment", "prod")
data_source = f"s3://bucket/{env}/data.csv"
return process_data(data_source)Runtime attributes can be mocked using environment variables:
import os
# Mock deployment context
os.environ["PREFECT__RUNTIME__DEPLOYMENT__ID"] = "test-deployment-id"
os.environ["PREFECT__RUNTIME__DEPLOYMENT__PARAMETERS"] = '{"env": "test"}'
# Mock flow run context
os.environ["PREFECT__RUNTIME__FLOW_RUN__ID"] = "test-flow-run-id"
os.environ["PREFECT__RUNTIME__FLOW_RUN__TAGS"] = '["test", "debug"]'Note: Runtime context modules return empty values when not executing within a Prefect flow run context.
Install with Tessl CLI
npx tessl i tessl/pypi-prefect