or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdindex.mdlineage-entities.mdnotifier-compatibility.mdopenlineage-integration.mdprovider-verification.mdsecurity-permissions.mdstandard-components.mdversion-compatibility.md

standard-components.mddocs/

0

# Standard Components

1

2

Version-compatible standard Airflow operators, triggers, and utilities including PythonOperator, TimeDeltaTrigger, and virtualenv preparation functions. These components provide consistent interfaces to core Airflow functionality across different versions.

3

4

## Capabilities

5

6

### Python Operator

7

8

Version-compatible Python operator for executing Python functions in Airflow tasks.

9

10

```python { .api }

11

class PythonOperator(BaseOperator):

12

"""

13

Version-compatible Python operator.

14

15

Maps to airflow.providers.standard.operators.python.PythonOperator in Airflow 3.0+

16

Maps to airflow.operators.python.PythonOperator in Airflow < 3.0

17

"""

18

```

19

20

### Short Circuit Operator

21

22

Operator that allows conditional workflow execution by short-circuiting downstream tasks.

23

24

```python { .api }

25

class ShortCircuitOperator(BaseOperator):

26

"""

27

Version-compatible short circuit operator.

28

29

Maps to airflow.providers.standard.operators.python.ShortCircuitOperator in Airflow 3.0+

30

Maps to airflow.operators.python.ShortCircuitOperator in Airflow < 3.0

31

"""

32

```

33

34

### Serializers

35

36

Serialization utilities for Python operator data persistence.

37

38

```python { .api }

39

_SERIALIZERS: dict

40

"""

41

Serializers for Python operator data.

42

Contains serialization functions for various data types.

43

"""

44

```

45

46

### Context Functions

47

48

Utilities for accessing Airflow execution context within tasks.

49

50

```python { .api }

51

def get_current_context():

52

"""

53

Get the current Airflow execution context.

54

55

Returns:

56

Context: Current task execution context containing dag_run, task_instance, etc.

57

58

Maps to airflow.providers.standard.operators.python.get_current_context in Airflow 3.0+

59

Maps to airflow.operators.python.get_current_context in Airflow < 3.0

60

"""

61

```

62

63

### Time Delta Trigger

64

65

Deferrable trigger that waits for a specified time duration.

66

67

```python { .api }

68

class TimeDeltaTrigger:

69

"""

70

Time delta trigger for deferrable operators.

71

72

Maps to airflow.providers.standard.triggers.temporal.TimeDeltaTrigger in Airflow 3.0+

73

Maps to airflow.triggers.temporal.TimeDeltaTrigger in Airflow < 3.0

74

"""

75

```

76

77

### Virtual Environment Utilities

78

79

Functions for preparing and managing Python virtual environments for task execution.

80

81

```python { .api }

82

def write_python_script(...):

83

"""

84

Write Python scripts for virtualenv execution.

85

86

Maps to airflow.providers.standard.operators.python.write_python_script in Airflow 3.0+

87

Maps to airflow.operators.python.write_python_script in Airflow < 3.0

88

"""

89

90

def prepare_virtualenv(...):

91

"""

92

Prepare virtual environment for Python execution.

93

94

Maps to airflow.providers.standard.operators.python.prepare_virtualenv in Airflow 3.0+

95

Maps to airflow.operators.python.prepare_virtualenv in Airflow < 3.0

96

"""

97

```

98

99

## Usage Examples

100

101

```python

102

from airflow.providers.common.compat.standard.operators import (

103

PythonOperator,

104

ShortCircuitOperator,

105

get_current_context

106

)

107

from airflow.providers.common.compat.standard.triggers import TimeDeltaTrigger

108

from airflow.providers.common.compat.standard.utils import write_python_script, prepare_virtualenv

109

110

from airflow import DAG

111

from datetime import datetime, timedelta

112

113

# Create DAG

114

dag = DAG(

115

"example_standard_components",

116

start_date=datetime(2024, 1, 1),

117

schedule_interval=timedelta(days=1),

118

catchup=False

119

)

120

121

# Python task function

122

def my_python_function(**context):

123

# Access current context

124

current_context = get_current_context()

125

126

print(f"Task ID: {current_context['task_instance'].task_id}")

127

print(f"Execution date: {current_context['execution_date']}")

128

129

return "Task completed successfully"

130

131

# Create PythonOperator task

132

python_task = PythonOperator(

133

task_id="python_task",

134

python_callable=my_python_function,

135

dag=dag

136

)

137

138

# Short circuit condition function

139

def should_continue(**context):

140

# Some business logic to determine if workflow should continue

141

execution_date = context['execution_date']

142

return execution_date.weekday() < 5 # Only run on weekdays

143

144

# Create ShortCircuitOperator task

145

gate_task = ShortCircuitOperator(

146

task_id="weekday_gate",

147

python_callable=should_continue,

148

dag=dag

149

)

150

151

# Deferrable task using TimeDeltaTrigger

152

from airflow.sensors.base import BaseSensorOperator

153

154

class WaitSensor(BaseSensorOperator):

155

def __init__(self, wait_duration: timedelta, **kwargs):

156

super().__init__(**kwargs)

157

self.wait_duration = wait_duration

158

159

def execute(self, context):

160

if not self.poke(context):

161

self.defer(

162

trigger=TimeDeltaTrigger(delta=self.wait_duration),

163

method_name="execute_complete"

164

)

165

166

def poke(self, context):

167

return False # Always defer

168

169

def execute_complete(self, context, event):

170

return "Wait completed"

171

172

wait_task = WaitSensor(

173

task_id="wait_5_minutes",

174

wait_duration=timedelta(minutes=5),

175

dag=dag

176

)

177

178

# Virtual environment task

179

def use_virtualenv_utilities():

180

# Prepare virtual environment

181

venv_dir = prepare_virtualenv(

182

venv_directory="/tmp/my_venv",

183

python_bin="python3.9",

184

requirements=["pandas==1.5.0", "numpy==1.24.0"]

185

)

186

187

# Write Python script for execution

188

script_path = write_python_script(

189

jinja_context={},

190

template_filename="my_script.py.j2",

191

op_args=[],

192

op_kwargs={}

193

)

194

195

return f"Prepared venv at {venv_dir}, script at {script_path}"

196

197

venv_task = PythonOperator(

198

task_id="virtualenv_prep",

199

python_callable=use_virtualenv_utilities,

200

dag=dag

201

)

202

203

# Set task dependencies

204

gate_task >> python_task >> wait_task >> venv_task

205

```