Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility. This module ensures the provider works across different Airflow versions by conditionally importing components based on the detected Airflow version.
Function to determine the current Airflow version for compatibility checks.
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
"""
Get the base Airflow version as a tuple of integers.
Returns:
Tuple of (major, minor, micro) version numbers from the current Airflow installation
"""Boolean flags that indicate whether specific Airflow version features are available.
AIRFLOW_V_3_0_PLUS: bool
"""
Boolean flag indicating if the current Airflow version is 3.0.0 or higher.
Used to conditionally import or use features that are only available in Airflow 3.0+.
"""
AIRFLOW_V_3_1_PLUS: bool
"""
Boolean flag indicating if the current Airflow version is 3.1.0 or higher.
Used to conditionally import or use features that are only available in Airflow 3.1+.
"""Classes and types that are conditionally imported based on the Airflow version.
BaseHook: type
"""
Base hook class that is conditionally imported from different modules based on Airflow version.
- Airflow 3.1+: imported from airflow.sdk
- Earlier versions: imported from airflow.hooks.base
"""
EsLogMsgType: type
"""
Type alias for log message types that varies based on Airflow version.
- Airflow 3.0+: list[StructuredLogMessage] | str
- Earlier versions: list[tuple[str, str]]
"""from airflow.providers.elasticsearch.version_compat import (
AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS,
BaseHook,
EsLogMsgType
)
# Use version flags for conditional behavior
if AIRFLOW_V_3_0_PLUS:
# Use Airflow 3.0+ features
from airflow.utils.log.file_task_handler import StructuredLogMessage
log_messages: EsLogMsgType = [
StructuredLogMessage(timestamp="2024-01-01", message="Log entry")
]
else:
# Use legacy format for older versions
log_messages: EsLogMsgType = [
("2024-01-01", "Log entry")
]
# BaseHook is automatically imported from the correct module
class CustomElasticsearchHook(BaseHook):
def __init__(self):
super().__init__()from airflow.providers.elasticsearch.version_compat import get_base_airflow_version_tuple
# Check specific version requirements
version = get_base_airflow_version_tuple()
print(f"Running on Airflow {version[0]}.{version[1]}.{version[2]}")
if version >= (3, 0, 0):
print("Using Airflow 3.0+ features")
elif version >= (2, 10, 0):
print("Using Airflow 2.10+ features")
else:
print("Minimum Airflow 2.10.0 required")from airflow.providers.elasticsearch.version_compat import BaseHook, AIRFLOW_V_3_1_PLUS
class VersionAwareElasticsearchHook(BaseHook):
def __init__(self):
super().__init__()
def get_connection_info(self):
if AIRFLOW_V_3_1_PLUS:
# Use SDK-based connection handling
return self._get_connection_v31()
else:
# Use legacy connection handling
return self._get_connection_legacy()
def _get_connection_v31(self):
# Implementation for Airflow 3.1+
pass
def _get_connection_legacy(self):
# Implementation for earlier versions
passfrom airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS, EsLogMsgType
def format_log_messages(messages) -> EsLogMsgType:
if AIRFLOW_V_3_0_PLUS:
# Return structured log messages for Airflow 3.0+
from airflow.utils.log.file_task_handler import StructuredLogMessage
return [
StructuredLogMessage(
timestamp=msg['timestamp'],
message=msg['message'],
level=msg.get('level', 'INFO')
)
for msg in messages
]
else:
# Return tuple format for earlier versions
return [
(msg['timestamp'], msg['message'])
for msg in messages
]The version compatibility module uses several strategies to ensure cross-version compatibility:
# The module uses try/except blocks for conditional imports
if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseHook
else:
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]# Type aliases change based on version
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.log.file_task_handler import StructuredLogMessage
EsLogMsgType = list[StructuredLogMessage] | str
else:
EsLogMsgType = list[tuple[str, str]] # type: ignore[assignment,misc]# Version detection uses packaging.version for accurate comparison
from packaging.version import Version
from airflow import __version__
airflow_version = Version(__version__)
return airflow_version.major, airflow_version.minor, airflow_version.microInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch