or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connection-management.mdindex.mdpython-hook.mdsql-hook.mdtask-logging.mdversion-compatibility.md
tile.json

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-elasticsearch@6.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-elasticsearch@6.3.0

index.mddocs/

Apache Airflow Elasticsearch Provider

A provider package that enables Elasticsearch integration for Apache Airflow workflows. This package provides hooks for connecting to Elasticsearch clusters, logging capabilities that write task logs directly to Elasticsearch indexes, and SQL query execution against Elasticsearch using the SQL API.

Package Information

  • Package Name: apache-airflow-providers-elasticsearch
  • Package Type: pip
  • Language: Python
  • Installation: pip install apache-airflow-providers-elasticsearch
  • Version: 6.3.2
  • Requirements:
    • apache-airflow >= 2.10.0
    • elasticsearch >= 8.10, < 9
    • apache-airflow-providers-common-sql >= 1.27.0

Core Imports

Standard imports for using the provider:

# Main package version
from airflow.providers.elasticsearch import __version__

# SQL Hook for database-like operations
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook

# Python Hook for native Elasticsearch operations
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

# Task logging handler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler

# JSON formatter for structured logging
from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter

# Response classes for handling Elasticsearch results
from airflow.providers.elasticsearch.log.es_response import (
    AttributeList, AttributeDict, Hit, HitMeta, ElasticSearchResponse
)

# Version compatibility utilities
from airflow.providers.elasticsearch.version_compat import (
    get_base_airflow_version_tuple, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, 
    BaseHook, EsLogMsgType
)

Connection components:

# Direct connection utilities
from airflow.providers.elasticsearch.hooks.elasticsearch import connect, ESConnection, ElasticsearchSQLCursor

Basic Usage

Setting up an Elasticsearch Connection

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook

# Using Airflow connection
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
conn = hook.get_conn()

# Execute SQL query
result = conn.execute_sql("SELECT * FROM my_index LIMIT 10")
print(result)

Basic Search Operations

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

# Initialize with hosts
hook = ElasticsearchPythonHook(
    hosts=["http://localhost:9200"],
    es_conn_args={"basic_auth": ("user", "password")}
)

# Perform search
query = {
    "query": {
        "match": {
            "message": "error"
        }
    }
}

results = hook.search(query=query, index="logs-*")
print(f"Found {len(results['hits'])} results")

Logging Configuration

# In airflow.cfg
[elasticsearch]
host = localhost:9200
write_to_es = True
target_index = airflow-logs
json_format = True

Architecture

The provider follows Airflow's standard provider architecture with three main components:

  • Hooks: Provide connection interfaces to Elasticsearch clusters
  • Logging: Integrate Elasticsearch as a logging backend for task outputs
  • Version Compatibility: Handle version differences between Airflow releases

The package supports both SQL-based queries through the Elasticsearch SQL API and native Elasticsearch operations through the Python client, with configurable logging that can write task outputs to Elasticsearch indexes with optional JSON formatting and Kibana frontend integration.

Capabilities

Elasticsearch SQL Hook

Database-like interface using Elasticsearch's SQL API with PEP 249 compliance. Supports connection management, cursor operations, and SQL query execution against Elasticsearch indexes.

class ElasticsearchSQLHook(DbApiHook):
    def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs): ...
    def get_conn(self) -> ESConnection: ...
    def get_uri(self) -> str: ...

SQL Hook

Elasticsearch Python Hook

Native Elasticsearch operations using the official Python client. Provides direct access to Elasticsearch APIs for search, indexing, and cluster management operations.

class ElasticsearchPythonHook(BaseHook):
    def __init__(self, hosts: list[Any], es_conn_args: dict | None = None): ...
    def get_conn(self) -> Elasticsearch: ...  # Note: This is a cached_property in implementation
    def search(self, query: dict[Any, Any], index: str = "_all") -> dict: ...

Python Hook

Task Logging

Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns.

class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
    def __init__(
        self,
        base_log_folder: str,
        end_of_log_mark: str,
        write_stdout: bool,
        json_format: bool,
        json_fields: str,
        write_to_es: bool = False,
        target_index: str = "airflow-logs",
        host_field: str = "host",
        offset_field: str = "offset",
        host: str = "http://localhost:9200",
        frontend: str = "localhost:5601",
        index_patterns: str = ...,
        index_patterns_callable: str = "",
        es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
        **kwargs
    ): ...
    def emit(self, record): ...
    def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: ...
    def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: ...

Task Logging

Connection Management

Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination.

def connect(host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any) -> ESConnection: ...

class ESConnection:
    def __init__(self, host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any): ...
    def cursor(self) -> ElasticsearchSQLCursor: ...
    def execute_sql(self, query: str, params: Iterable | Mapping[str, Any] | None = None) -> ObjectApiResponse: ...

Connection Management

Version Compatibility

Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility.

def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...

AIRFLOW_V_3_0_PLUS: bool  # Version compatibility flag
AIRFLOW_V_3_1_PLUS: bool  # Version compatibility flag
BaseHook: type  # Base hook class, conditionally imported
EsLogMsgType: type  # Type alias for log message types

Version Compatibility