Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Database-like interface for Elasticsearch that provides PEP 249-compliant access using Elasticsearch's SQL API. This hook enables SQL query execution against Elasticsearch indexes with full cursor support and connection management.
Main hook class that extends Airflow's DbApiHook to provide database-like access to Elasticsearch clusters through the SQL API.
class ElasticsearchSQLHook(DbApiHook):
"""
Interact with Elasticsearch through the elasticsearch-dbapi.
This hook uses the Elasticsearch conn_id.
:param elasticsearch_conn_id: The ElasticSearch connection id used for Elasticsearch credentials.
"""
conn_name_attr = "elasticsearch_conn_id"
default_conn_name = "elasticsearch_default"
connector = ESConnection
conn_type = "elasticsearch"
hook_name = "Elasticsearch"
def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs):
"""
Initialize the Elasticsearch SQL Hook.
Parameters:
- schema: Connection schema (default: "http")
- connection: Airflow connection object
"""
def get_conn(self) -> ESConnection:
"""
Return an elasticsearch connection object.
Returns:
ESConnection configured with connection parameters
"""
def get_uri(self) -> str:
"""
Return the connection URI string.
Returns:
String representation of the connection URI
"""
def _get_polars_df(
self,
sql: str,
parameters: list | tuple | Mapping[str, Any] | None = None,
**kwargs
):
"""
Get Polars DataFrame from SQL query (not currently supported).
Parameters:
- sql: SQL query string
- parameters: Query parameters
- **kwargs: Additional arguments
Raises:
NotImplementedError: Polars is not supported for Elasticsearch
"""from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
# Initialize with connection ID
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
# Get connection and execute query
conn = hook.get_conn()
result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")
# Process results
for row in result['rows']:
print(row)hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
uri = hook.get_uri()
print(f"Connection URI: {uri}")
# Output: elasticsearch+http://user:password@localhost:9200/?param=valuefrom airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
class CustomElasticsearchHook(ElasticsearchSQLHook):
def custom_query_method(self, index_pattern: str):
conn = self.get_conn()
return conn.execute_sql(f"SELECT * FROM {index_pattern}")The hook uses Airflow connections with the following parameters:
Example connection extra parameters:
{
"http_compress": true,
"verify_certs": false,
"fetch_size": 1000,
"field_multi_value_leniency": true
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch