0
# Version Compatibility
1
2
Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility. This module ensures the provider works across different Airflow versions by conditionally importing components based on the detected Airflow version.
3
4
## Capabilities
5
6
### Version Detection Function
7
8
Function to determine the current Airflow version for compatibility checks.
9
10
```python { .api }
11
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
12
"""
13
Get the base Airflow version as a tuple of integers.
14
15
Returns:
16
Tuple of (major, minor, micro) version numbers from the current Airflow installation
17
"""
18
```
19
20
### Version Compatibility Constants
21
22
Boolean flags that indicate whether specific Airflow version features are available.
23
24
```python { .api }
25
AIRFLOW_V_3_0_PLUS: bool
26
"""
27
Boolean flag indicating if the current Airflow version is 3.0.0 or higher.
28
Used to conditionally import or use features that are only available in Airflow 3.0+.
29
"""
30
31
AIRFLOW_V_3_1_PLUS: bool
32
"""
33
Boolean flag indicating if the current Airflow version is 3.1.0 or higher.
34
Used to conditionally import or use features that are only available in Airflow 3.1+.
35
"""
36
```
37
38
### Conditional Imports
39
40
Classes and types that are conditionally imported based on the Airflow version.
41
42
```python { .api }
43
BaseHook: type
44
"""
45
Base hook class that is conditionally imported from different modules based on Airflow version.
46
- Airflow 3.1+: imported from airflow.sdk
47
- Earlier versions: imported from airflow.hooks.base
48
"""
49
50
EsLogMsgType: type
51
"""
52
Type alias for log message types that varies based on Airflow version.
53
- Airflow 3.0+: list[StructuredLogMessage] | str
54
- Earlier versions: list[tuple[str, str]]
55
"""
56
```
57
58
### Usage Examples
59
60
#### Version-Dependent Code
61
62
```python
63
from airflow.providers.elasticsearch.version_compat import (
64
AIRFLOW_V_3_0_PLUS,
65
AIRFLOW_V_3_1_PLUS,
66
BaseHook,
67
EsLogMsgType
68
)
69
70
# Use version flags for conditional behavior
71
if AIRFLOW_V_3_0_PLUS:
72
# Use Airflow 3.0+ features
73
from airflow.utils.log.file_task_handler import StructuredLogMessage
74
log_messages: EsLogMsgType = [
75
StructuredLogMessage(timestamp="2024-01-01", message="Log entry")
76
]
77
else:
78
# Use legacy format for older versions
79
log_messages: EsLogMsgType = [
80
("2024-01-01", "Log entry")
81
]
82
83
# BaseHook is automatically imported from the correct module
84
class CustomElasticsearchHook(BaseHook):
85
def __init__(self):
86
super().__init__()
87
```
88
89
#### Version Detection
90
91
```python
92
from airflow.providers.elasticsearch.version_compat import get_base_airflow_version_tuple
93
94
# Check specific version requirements
95
version = get_base_airflow_version_tuple()
96
print(f"Running on Airflow {version[0]}.{version[1]}.{version[2]}")
97
98
if version >= (3, 0, 0):
99
print("Using Airflow 3.0+ features")
100
elif version >= (2, 10, 0):
101
print("Using Airflow 2.10+ features")
102
else:
103
print("Minimum Airflow 2.10.0 required")
104
```
105
106
#### Hook Development
107
108
```python
109
from airflow.providers.elasticsearch.version_compat import BaseHook, AIRFLOW_V_3_1_PLUS
110
111
class VersionAwareElasticsearchHook(BaseHook):
112
def __init__(self):
113
super().__init__()
114
115
def get_connection_info(self):
116
if AIRFLOW_V_3_1_PLUS:
117
# Use SDK-based connection handling
118
return self._get_connection_v31()
119
else:
120
# Use legacy connection handling
121
return self._get_connection_legacy()
122
123
def _get_connection_v31(self):
124
# Implementation for Airflow 3.1+
125
pass
126
127
def _get_connection_legacy(self):
128
# Implementation for earlier versions
129
pass
130
```
131
132
#### Logging Compatibility
133
134
```python
135
from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS, EsLogMsgType
136
137
def format_log_messages(messages) -> EsLogMsgType:
138
if AIRFLOW_V_3_0_PLUS:
139
# Return structured log messages for Airflow 3.0+
140
from airflow.utils.log.file_task_handler import StructuredLogMessage
141
return [
142
StructuredLogMessage(
143
timestamp=msg['timestamp'],
144
message=msg['message'],
145
level=msg.get('level', 'INFO')
146
)
147
for msg in messages
148
]
149
else:
150
# Return tuple format for earlier versions
151
return [
152
(msg['timestamp'], msg['message'])
153
for msg in messages
154
]
155
```
156
157
### Implementation Notes
158
159
The version compatibility module uses several strategies to ensure cross-version compatibility:
160
161
#### Conditional Imports
162
```python
163
# The module uses try/except blocks for conditional imports
164
if AIRFLOW_V_3_1_PLUS:
165
from airflow.sdk import BaseHook
166
else:
167
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]
168
```
169
170
#### Type Aliasing
171
```python
172
# Type aliases change based on version
173
if AIRFLOW_V_3_0_PLUS:
174
from airflow.utils.log.file_task_handler import StructuredLogMessage
175
EsLogMsgType = list[StructuredLogMessage] | str
176
else:
177
EsLogMsgType = list[tuple[str, str]] # type: ignore[assignment,misc]
178
```
179
180
#### Version Detection
181
```python
182
# Version detection uses packaging.version for accurate comparison
183
from packaging.version import Version
184
from airflow import __version__
185
186
airflow_version = Version(__version__)
187
return airflow_version.major, airflow_version.minor, airflow_version.micro
188
```
189
190
### Notes
191
192
- This module is deliberately copied into provider packages to avoid cross-provider dependencies
193
- All version checks use the base version (without pre-release or build metadata)
194
- The module provides a stable interface that remains consistent across provider versions
195
- Type annotations are carefully managed to work with both old and new Airflow versions
196
- The module follows the same pattern used across all Airflow provider packages