Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities
npx @tessl/cli install tessl/pypi-apache-airflow-providers-elasticsearch@6.3.00
# Apache Airflow Elasticsearch Provider
1
2
A provider package that enables Elasticsearch integration for Apache Airflow workflows. This package provides hooks for connecting to Elasticsearch clusters, logging capabilities that write task logs directly to Elasticsearch indexes, and SQL query execution against Elasticsearch using the SQL API.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-elasticsearch
7
- **Package Type**: pip
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-elasticsearch`
10
- **Version**: 6.3.2
11
- **Requirements**:
12
- apache-airflow >= 2.10.0
13
- elasticsearch >= 8.10, < 9
14
- apache-airflow-providers-common-sql >= 1.27.0
15
16
## Core Imports
17
18
Standard imports for using the provider:
19
20
```python
21
# Main package version
22
from airflow.providers.elasticsearch import __version__
23
24
# SQL Hook for database-like operations
25
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
26
27
# Python Hook for native Elasticsearch operations
28
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
29
30
# Task logging handler
31
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
32
33
# JSON formatter for structured logging
34
from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter
35
36
# Response classes for handling Elasticsearch results
37
from airflow.providers.elasticsearch.log.es_response import (
38
AttributeList, AttributeDict, Hit, HitMeta, ElasticSearchResponse
39
)
40
41
# Version compatibility utilities
42
from airflow.providers.elasticsearch.version_compat import (
43
get_base_airflow_version_tuple, AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS,
44
BaseHook, EsLogMsgType
45
)
46
```
47
48
Connection components:
49
50
```python
51
# Direct connection utilities
52
from airflow.providers.elasticsearch.hooks.elasticsearch import connect, ESConnection, ElasticsearchSQLCursor
53
```
54
55
## Basic Usage
56
57
### Setting up an Elasticsearch Connection
58
59
```python
60
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
61
62
# Using Airflow connection
63
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
64
conn = hook.get_conn()
65
66
# Execute SQL query
67
result = conn.execute_sql("SELECT * FROM my_index LIMIT 10")
68
print(result)
69
```
70
71
### Basic Search Operations
72
73
```python
74
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
75
76
# Initialize with hosts
77
hook = ElasticsearchPythonHook(
78
hosts=["http://localhost:9200"],
79
es_conn_args={"basic_auth": ("user", "password")}
80
)
81
82
# Perform search
83
query = {
84
"query": {
85
"match": {
86
"message": "error"
87
}
88
}
89
}
90
91
results = hook.search(query=query, index="logs-*")
92
print(f"Found {len(results['hits'])} results")
93
```
94
95
### Logging Configuration
96
97
```python
98
# In airflow.cfg
99
[elasticsearch]
100
host = localhost:9200
101
write_to_es = True
102
target_index = airflow-logs
103
json_format = True
104
```
105
106
## Architecture
107
108
The provider follows Airflow's standard provider architecture with three main components:
109
110
- **Hooks**: Provide connection interfaces to Elasticsearch clusters
111
- **Logging**: Integrate Elasticsearch as a logging backend for task outputs
112
- **Version Compatibility**: Handle version differences between Airflow releases
113
114
The package supports both SQL-based queries through the Elasticsearch SQL API and native Elasticsearch operations through the Python client, with configurable logging that can write task outputs to Elasticsearch indexes with optional JSON formatting and Kibana frontend integration.
115
116
## Capabilities
117
118
### Elasticsearch SQL Hook
119
120
Database-like interface using Elasticsearch's SQL API with PEP 249 compliance. Supports connection management, cursor operations, and SQL query execution against Elasticsearch indexes.
121
122
```python { .api }
123
class ElasticsearchSQLHook(DbApiHook):
124
def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs): ...
125
def get_conn(self) -> ESConnection: ...
126
def get_uri(self) -> str: ...
127
```
128
129
[SQL Hook](./sql-hook.md)
130
131
### Elasticsearch Python Hook
132
133
Native Elasticsearch operations using the official Python client. Provides direct access to Elasticsearch APIs for search, indexing, and cluster management operations.
134
135
```python { .api }
136
class ElasticsearchPythonHook(BaseHook):
137
def __init__(self, hosts: list[Any], es_conn_args: dict | None = None): ...
138
def get_conn(self) -> Elasticsearch: ... # Note: This is a cached_property in implementation
139
def search(self, query: dict[Any, Any], index: str = "_all") -> dict: ...
140
```
141
142
[Python Hook](./python-hook.md)
143
144
### Task Logging
145
146
Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns.
147
148
```python { .api }
149
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
150
def __init__(
151
self,
152
base_log_folder: str,
153
end_of_log_mark: str,
154
write_stdout: bool,
155
json_format: bool,
156
json_fields: str,
157
write_to_es: bool = False,
158
target_index: str = "airflow-logs",
159
host_field: str = "host",
160
offset_field: str = "offset",
161
host: str = "http://localhost:9200",
162
frontend: str = "localhost:5601",
163
index_patterns: str = ...,
164
index_patterns_callable: str = "",
165
es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs",
166
**kwargs
167
): ...
168
def emit(self, record): ...
169
def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: ...
170
def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: ...
171
```
172
173
[Task Logging](./task-logging.md)
174
175
### Connection Management
176
177
Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination.
178
179
```python { .api }
180
def connect(host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any) -> ESConnection: ...
181
182
class ESConnection:
183
def __init__(self, host: str = "localhost", port: int = 9200, user: str | None = None, password: str | None = None, scheme: str = "http", **kwargs: Any): ...
184
def cursor(self) -> ElasticsearchSQLCursor: ...
185
def execute_sql(self, query: str, params: Iterable | Mapping[str, Any] | None = None) -> ObjectApiResponse: ...
186
```
187
188
[Connection Management](./connection-management.md)
189
190
### Version Compatibility
191
192
Version compatibility utilities that handle differences between Airflow releases and provide conditional imports for cross-version compatibility.
193
194
```python { .api }
195
def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...
196
197
AIRFLOW_V_3_0_PLUS: bool # Version compatibility flag
198
AIRFLOW_V_3_1_PLUS: bool # Version compatibility flag
199
BaseHook: type # Base hook class, conditionally imported
200
EsLogMsgType: type # Type alias for log message types
201
```
202
203
[Version Compatibility](./version-compatibility.md)