CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

version-compatibility.mddocs/

Version Compatibility

Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility. This module ensures the provider works across different Airflow versions by conditionally importing components based on the detected Airflow version.

Capabilities

Version Detection Function

Function to determine the current Airflow version for compatibility checks.

def get_base_airflow_version_tuple() -> tuple[int, int, int]:
    """
    Get the base Airflow version as a tuple of integers.
    
    Returns:
    Tuple of (major, minor, micro) version numbers from the current Airflow installation
    """

Version Compatibility Constants

Boolean flags that indicate whether specific Airflow version features are available.

AIRFLOW_V_3_0_PLUS: bool
"""
Boolean flag indicating if the current Airflow version is 3.0.0 or higher.
Used to conditionally import or use features that are only available in Airflow 3.0+.
"""

AIRFLOW_V_3_1_PLUS: bool
"""
Boolean flag indicating if the current Airflow version is 3.1.0 or higher.
Used to conditionally import or use features that are only available in Airflow 3.1+.
"""

Conditional Imports

Classes and types that are conditionally imported based on the Airflow version.

BaseHook: type
"""
Base hook class that is conditionally imported from different modules based on Airflow version.
- Airflow 3.1+: imported from airflow.sdk
- Earlier versions: imported from airflow.hooks.base
"""

EsLogMsgType: type
"""
Type alias for log message types that varies based on Airflow version.
- Airflow 3.0+: list[StructuredLogMessage] | str
- Earlier versions: list[tuple[str, str]]
"""

Usage Examples

Version-Dependent Code

from airflow.providers.elasticsearch.version_compat import (
    AIRFLOW_V_3_0_PLUS,
    AIRFLOW_V_3_1_PLUS,
    BaseHook,
    EsLogMsgType
)

# Use version flags for conditional behavior
if AIRFLOW_V_3_0_PLUS:
    # Use Airflow 3.0+ features
    from airflow.utils.log.file_task_handler import StructuredLogMessage
    log_messages: EsLogMsgType = [
        StructuredLogMessage(timestamp="2024-01-01", message="Log entry")
    ]
else:
    # Use legacy format for older versions
    log_messages: EsLogMsgType = [
        ("2024-01-01", "Log entry")
    ]

# BaseHook is automatically imported from the correct module
class CustomElasticsearchHook(BaseHook):
    def __init__(self):
        super().__init__()

Version Detection

from airflow.providers.elasticsearch.version_compat import get_base_airflow_version_tuple

# Check specific version requirements
version = get_base_airflow_version_tuple()
print(f"Running on Airflow {version[0]}.{version[1]}.{version[2]}")

if version >= (3, 0, 0):
    print("Using Airflow 3.0+ features")
elif version >= (2, 10, 0):
    print("Using Airflow 2.10+ features")
else:
    print("Minimum Airflow 2.10.0 required")

Hook Development

from airflow.providers.elasticsearch.version_compat import BaseHook, AIRFLOW_V_3_1_PLUS

class VersionAwareElasticsearchHook(BaseHook):
    def __init__(self):
        super().__init__()
        
    def get_connection_info(self):
        if AIRFLOW_V_3_1_PLUS:
            # Use SDK-based connection handling
            return self._get_connection_v31()
        else:
            # Use legacy connection handling
            return self._get_connection_legacy()
            
    def _get_connection_v31(self):
        # Implementation for Airflow 3.1+
        pass
        
    def _get_connection_legacy(self):
        # Implementation for earlier versions
        pass

Logging Compatibility

from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS, EsLogMsgType

def format_log_messages(messages) -> EsLogMsgType:
    if AIRFLOW_V_3_0_PLUS:
        # Return structured log messages for Airflow 3.0+
        from airflow.utils.log.file_task_handler import StructuredLogMessage
        return [
            StructuredLogMessage(
                timestamp=msg['timestamp'],
                message=msg['message'],
                level=msg.get('level', 'INFO')
            )
            for msg in messages
        ]
    else:
        # Return tuple format for earlier versions
        return [
            (msg['timestamp'], msg['message'])
            for msg in messages
        ]

Implementation Notes

The version compatibility module uses several strategies to ensure cross-version compatibility:

Conditional Imports

# The module uses try/except blocks for conditional imports
if AIRFLOW_V_3_1_PLUS:
    from airflow.sdk import BaseHook
else:
    from airflow.hooks.base import BaseHook  # type: ignore[attr-defined,no-redef]

Type Aliasing

# Type aliases change based on version
if AIRFLOW_V_3_0_PLUS:
    from airflow.utils.log.file_task_handler import StructuredLogMessage
    EsLogMsgType = list[StructuredLogMessage] | str
else:
    EsLogMsgType = list[tuple[str, str]]  # type: ignore[assignment,misc]

Version Detection

# Version detection uses packaging.version for accurate comparison
from packaging.version import Version
from airflow import __version__

airflow_version = Version(__version__)
return airflow_version.major, airflow_version.minor, airflow_version.micro

Notes

  • This module is deliberately copied into provider packages to avoid cross-provider dependencies
  • All version checks use the base version (without pre-release or build metadata)
  • The module provides a stable interface that remains consistent across provider versions
  • Type annotations are carefully managed to work with both old and new Airflow versions
  • The module follows the same pattern used across all Airflow provider packages

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch

docs

connection-management.md

index.md

python-hook.md

sql-hook.md

task-logging.md

version-compatibility.md

tile.json