0
# Standard Components
1
2
Version-compatible standard Airflow operators, triggers, and utilities including PythonOperator, TimeDeltaTrigger, and virtualenv preparation functions. These components provide consistent interfaces to core Airflow functionality across different versions.
3
4
## Capabilities
5
6
### Python Operator
7
8
Version-compatible Python operator for executing Python functions in Airflow tasks.
9
10
```python { .api }
11
class PythonOperator(BaseOperator):
12
"""
13
Version-compatible Python operator.
14
15
Maps to airflow.providers.standard.operators.python.PythonOperator in Airflow 3.0+
16
Maps to airflow.operators.python.PythonOperator in Airflow < 3.0
17
"""
18
```
19
20
### Short Circuit Operator
21
22
Operator that allows conditional workflow execution by short-circuiting downstream tasks.
23
24
```python { .api }
25
class ShortCircuitOperator(BaseOperator):
26
"""
27
Version-compatible short circuit operator.
28
29
Maps to airflow.providers.standard.operators.python.ShortCircuitOperator in Airflow 3.0+
30
Maps to airflow.operators.python.ShortCircuitOperator in Airflow < 3.0
31
"""
32
```
33
34
### Serializers
35
36
Serialization utilities for Python operator data persistence.
37
38
```python { .api }
39
_SERIALIZERS: dict
40
"""
41
Serializers for Python operator data.
42
Contains serialization functions for various data types.
43
"""
44
```
45
46
### Context Functions
47
48
Utilities for accessing Airflow execution context within tasks.
49
50
```python { .api }
51
def get_current_context():
52
"""
53
Get the current Airflow execution context.
54
55
Returns:
56
Context: Current task execution context containing dag_run, task_instance, etc.
57
58
Maps to airflow.providers.standard.operators.python.get_current_context in Airflow 3.0+
59
Maps to airflow.operators.python.get_current_context in Airflow < 3.0
60
"""
61
```
62
63
### Time Delta Trigger
64
65
Deferrable trigger that waits for a specified time duration.
66
67
```python { .api }
68
class TimeDeltaTrigger:
69
"""
70
Time delta trigger for deferrable operators.
71
72
Maps to airflow.providers.standard.triggers.temporal.TimeDeltaTrigger in Airflow 3.0+
73
Maps to airflow.triggers.temporal.TimeDeltaTrigger in Airflow < 3.0
74
"""
75
```
76
77
### Virtual Environment Utilities
78
79
Functions for preparing and managing Python virtual environments for task execution.
80
81
```python { .api }
82
def write_python_script(...):
83
"""
84
Write Python scripts for virtualenv execution.
85
86
Maps to airflow.providers.standard.operators.python.write_python_script in Airflow 3.0+
87
Maps to airflow.operators.python.write_python_script in Airflow < 3.0
88
"""
89
90
def prepare_virtualenv(...):
91
"""
92
Prepare virtual environment for Python execution.
93
94
Maps to airflow.providers.standard.operators.python.prepare_virtualenv in Airflow 3.0+
95
Maps to airflow.operators.python.prepare_virtualenv in Airflow < 3.0
96
"""
97
```
98
99
## Usage Examples
100
101
```python
102
from airflow.providers.common.compat.standard.operators import (
103
PythonOperator,
104
ShortCircuitOperator,
105
get_current_context
106
)
107
from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger
108
from airflow.providers.common.compat.standard.utils import write_python_script, prepare_virtualenv
109
110
from airflow import DAG
111
from datetime import datetime, timedelta
112
113
# Create DAG
114
dag = DAG(
115
"example_standard_components",
116
start_date=datetime(2024, 1, 1),
117
schedule_interval=timedelta(days=1),
118
catchup=False
119
)
120
121
# Python task function
122
def my_python_function(**context):
123
# Access current context
124
current_context = get_current_context()
125
126
print(f"Task ID: {current_context['task_instance'].task_id}")
127
print(f"Execution date: {current_context['execution_date']}")
128
129
return "Task completed successfully"
130
131
# Create PythonOperator task
132
python_task = PythonOperator(
133
task_id="python_task",
134
python_callable=my_python_function,
135
dag=dag
136
)
137
138
# Short circuit condition function
139
def should_continue(**context):
140
# Some business logic to determine if workflow should continue
141
execution_date = context['execution_date']
142
return execution_date.weekday() < 5 # Only run on weekdays
143
144
# Create ShortCircuitOperator task
145
gate_task = ShortCircuitOperator(
146
task_id="weekday_gate",
147
python_callable=should_continue,
148
dag=dag
149
)
150
151
# Deferrable task using TimeDeltaTrigger
152
from airflow.sensors.base import BaseSensorOperator
153
154
class WaitSensor(BaseSensorOperator):
155
def __init__(self, wait_duration: timedelta, **kwargs):
156
super().__init__(**kwargs)
157
self.wait_duration = wait_duration
158
159
def execute(self, context):
160
if not self.poke(context):
161
self.defer(
162
trigger=TimeDeltaTrigger(delta=self.wait_duration),
163
method_name="execute_complete"
164
)
165
166
def poke(self, context):
167
return False # Always defer
168
169
def execute_complete(self, context, event):
170
return "Wait completed"
171
172
wait_task = WaitSensor(
173
task_id="wait_5_minutes",
174
wait_duration=timedelta(minutes=5),
175
dag=dag
176
)
177
178
# Virtual environment task
179
def use_virtualenv_utilities():
180
# Prepare virtual environment
181
venv_dir = prepare_virtualenv(
182
venv_directory="/tmp/my_venv",
183
python_bin="python3.9",
184
requirements=["pandas==1.5.0", "numpy==1.24.0"]
185
)
186
187
# Write Python script for execution
188
script_path = write_python_script(
189
jinja_context={},
190
template_filename="my_script.py.j2",
191
op_args=[],
192
op_kwargs={}
193
)
194
195
return f"Prepared venv at {venv_dir}, script at {script_path}"
196
197
venv_task = PythonOperator(
198
task_id="virtualenv_prep",
199
python_callable=use_virtualenv_utilities,
200
dag=dag
201
)
202
203
# Set task dependencies
204
gate_task >> python_task >> wait_task >> venv_task
205
```