CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sql-hook.mddocs/

Elasticsearch SQL Hook

Database-like interface for Elasticsearch that provides PEP 249-compliant access using Elasticsearch's SQL API. This hook enables SQL query execution against Elasticsearch indexes with full cursor support and connection management.

Capabilities

SQL Hook Class

Main hook class that extends Airflow's DbApiHook to provide database-like access to Elasticsearch clusters through the SQL API.

class ElasticsearchSQLHook(DbApiHook):
    """
    Interact with Elasticsearch through the elasticsearch-dbapi.
    
    This hook uses the Elasticsearch conn_id.
    
    :param elasticsearch_conn_id: The ElasticSearch connection id used for Elasticsearch credentials.
    """
    
    conn_name_attr = "elasticsearch_conn_id"
    default_conn_name = "elasticsearch_default"
    connector = ESConnection
    conn_type = "elasticsearch"
    hook_name = "Elasticsearch"
    
    def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs):
        """
        Initialize the Elasticsearch SQL Hook.
        
        Parameters:
        - schema: Connection schema (default: "http")
        - connection: Airflow connection object
        """
        
    def get_conn(self) -> ESConnection:
        """
        Return an elasticsearch connection object.
        
        Returns:
        ESConnection configured with connection parameters
        """
        
    def get_uri(self) -> str:
        """
        Return the connection URI string.
        
        Returns:
        String representation of the connection URI
        """
        
    def _get_polars_df(
        self,
        sql: str,
        parameters: list | tuple | Mapping[str, Any] | None = None,
        **kwargs
    ):
        """
        Get Polars DataFrame from SQL query (not currently supported).
        
        Parameters:
        - sql: SQL query string
        - parameters: Query parameters
        - **kwargs: Additional arguments
        
        Raises:
        NotImplementedError: Polars is not supported for Elasticsearch
        """

Usage Examples

Basic Hook Usage

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook

# Initialize with connection ID
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')

# Get connection and execute query
conn = hook.get_conn()
result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")

# Process results
for row in result['rows']:
    print(row)

Connection URI Generation

hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
uri = hook.get_uri()
print(f"Connection URI: {uri}")
# Output: elasticsearch+http://user:password@localhost:9200/?param=value

Inheritance and Extension

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook

class CustomElasticsearchHook(ElasticsearchSQLHook):
    def custom_query_method(self, index_pattern: str):
        conn = self.get_conn()
        return conn.execute_sql(f"SELECT * FROM {index_pattern}")

Configuration

The hook uses Airflow connections with the following parameters:

  • Host: Elasticsearch server hostname
  • Port: Elasticsearch server port (default: 9200)
  • Login: Username for authentication
  • Password: Password for authentication
  • Schema: Connection scheme (http/https)
  • Extra: Additional connection parameters as JSON

Example connection extra parameters:

{
    "http_compress": true,
    "verify_certs": false,
    "fetch_size": 1000,
    "field_multi_value_leniency": true
}

Notes

  • The hook provides PEP 249 database API compliance through the ESConnection wrapper
  • Polars DataFrame integration is not supported (raises NotImplementedError)
  • All SQL operations are handled through Elasticsearch's SQL API
  • Connection parameters are automatically extracted from Airflow connections
  • The hook inherits all functionality from Airflow's DbApiHook base class

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch

docs

connection-management.md

index.md

python-hook.md

sql-hook.md

task-logging.md

version-compatibility.md

tile.json