Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities
npx @tessl/cli install tessl/pypi-apache-airflow-providers-elasticsearch@6.3.0A provider package that enables Elasticsearch integration for Apache Airflow workflows. This package provides hooks for connecting to Elasticsearch clusters, logging capabilities that write task logs directly to Elasticsearch indexes, and SQL query execution against Elasticsearch using the SQL API.
pip install apache-airflow-providers-elasticsearchStandard imports for using the provider:
# Main package version
from airflow.providers.elasticsearch import __version__
# SQL Hook for database-like operations
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
# Python Hook for native Elasticsearch operations
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
# Task logging handler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
# JSON formatter for structured logging
from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter
# Response classes for handling Elasticsearch results
from airflow.providers.elasticsearch.log.es_response import (
AttributeList, AttributeDict, Hit, HitMeta, ElasticSearchResponse
)
# Version compatibility utilities
from airflow.providers.elasticsearch.version_compat import (
get_base_airflow_version_tuple, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS,
BaseHook, EsLogMsgType
)Connection components:
# Direct connection utilities
from airflow.providers.elasticsearch.hooks.elasticsearch import connect, ESConnection, ElasticsearchSQLCursorfrom airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
# Using Airflow connection
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
conn = hook.get_conn()
# Execute SQL query
result = conn.execute_sql("SELECT * FROM my_index LIMIT 10")
print(result)from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
# Initialize with hosts
hook = ElasticsearchPythonHook(
hosts=["http://localhost:9200"],
es_conn_args={"basic_auth": ("user", "password")}
)
# Perform search
query = {
"query": {
"match": {
"message": "error"
}
}
}
results = hook.search(query=query, index="logs-*")
print(f"Found {len(results['hits'])} results")# In airflow.cfg
[elasticsearch]
host = localhost:9200
write_to_es = True
target_index = airflow-logs
json_format = TrueThe provider follows Airflow's standard provider architecture with three main components:
The package supports both SQL-based queries through the Elasticsearch SQL API and native Elasticsearch operations through the Python client, with configurable logging that can write task outputs to Elasticsearch indexes with optional JSON formatting and Kibana frontend integration.
Database-like interface using Elasticsearch's SQL API with PEP 249 compliance. Supports connection management, cursor operations, and SQL query execution against Elasticsearch indexes.
class ElasticsearchSQLHook(DbApiHook):
def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs): ...
def get_conn(self) -> ESConnection: ...
def get_uri(self) -> str: ...Native Elasticsearch operations using the official Python client. Provides direct access to Elasticsearch APIs for search, indexing, and cluster management operations.
class ElasticsearchPythonHook(BaseHook):
def __init__(self, hosts: list[Any], es_conn_args: dict | None = None): ...
def get_conn(self) -> Elasticsearch: ... # Note: This is a cached_property in implementation
def search(self, query: dict[Any, Any], index: str = "_all") -> dict: ...Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns.
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
def __init__(
self,
base_log_folder: str,
end_of_log_mark: str,
write_stdout: bool,
json_format: bool,
json_fields: str,
write_to_es: bool = False,
target_index: str = "airflow-logs",
host_field: str = "host",
offset_field: str = "offset",
host: str = "http://localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str = ...,
index_patterns_callable: str = "",
es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
**kwargs
): ...
def emit(self, record): ...
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: ...
def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: ...Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination.
def connect(host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any) -> ESConnection: ...
class ESConnection:
def __init__(self, host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any): ...
def cursor(self) -> ElasticsearchSQLCursor: ...
def execute_sql(self, query: str, params: Iterable | Mapping[str, Any] | None = None) -> ObjectApiResponse: ...Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility.
def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...
AIRFLOW_V_3_0_PLUS: bool # Version compatibility flag
AIRFLOW_V_3_1_PLUS: bool # Version compatibility flag
BaseHook: type # Base hook class, conditionally imported
EsLogMsgType: type # Type alias for log message types