0
# Elasticsearch SQL Hook
1
2
Database-like interface for Elasticsearch that provides PEP 249-compliant access using Elasticsearch's SQL API. This hook enables SQL query execution against Elasticsearch indexes with full cursor support and connection management.
3
4
## Capabilities
5
6
### SQL Hook Class
7
8
Main hook class that extends Airflow's DbApiHook to provide database-like access to Elasticsearch clusters through the SQL API.
9
10
```python { .api }
11
class ElasticsearchSQLHook(DbApiHook):
12
"""
13
Interact with Elasticsearch through the elasticsearch-dbapi.
14
15
This hook uses the Elasticsearch conn_id.
16
17
:param elasticsearch_conn_id: The ElasticSearch connection id used for Elasticsearch credentials.
18
"""
19
20
conn_name_attr = "elasticsearch_conn_id"
21
default_conn_name = "elasticsearch_default"
22
connector = ESConnection
23
conn_type = "elasticsearch"
24
hook_name = "Elasticsearch"
25
26
def __init__(self, schema: str = "http", connection: AirflowConnection | None = None, *args, **kwargs):
27
"""
28
Initialize the Elasticsearch SQL Hook.
29
30
Parameters:
31
- schema: Connection schema (default: "http")
32
- connection: Airflow connection object
33
"""
34
35
def get_conn(self) -> ESConnection:
36
"""
37
Return an elasticsearch connection object.
38
39
Returns:
40
ESConnection configured with connection parameters
41
"""
42
43
def get_uri(self) -> str:
44
"""
45
Return the connection URI string.
46
47
Returns:
48
String representation of the connection URI
49
"""
50
51
def _get_polars_df(
52
self,
53
sql: str,
54
parameters: list | tuple | Mapping[str, Any] | None = None,
55
**kwargs
56
):
57
"""
58
Get Polars DataFrame from SQL query (not currently supported).
59
60
Parameters:
61
- sql: SQL query string
62
- parameters: Query parameters
63
- **kwargs: Additional arguments
64
65
Raises:
66
NotImplementedError: Polars is not supported for Elasticsearch
67
"""
68
```
69
70
### Usage Examples
71
72
#### Basic Hook Usage
73
74
```python
75
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
76
77
# Initialize with connection ID
78
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
79
80
# Get connection and execute query
81
conn = hook.get_conn()
82
result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")
83
84
# Process results
85
for row in result['rows']:
86
print(row)
87
```
88
89
#### Connection URI Generation
90
91
```python
92
hook = ElasticsearchSQLHook(elasticsearch_conn_id='my_elasticsearch_conn')
93
uri = hook.get_uri()
94
print(f"Connection URI: {uri}")
95
# Output: elasticsearch+http://user:password@localhost:9200/?param=value
96
```
97
98
#### Inheritance and Extension
99
100
```python
101
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchSQLHook
102
103
class CustomElasticsearchHook(ElasticsearchSQLHook):
104
def custom_query_method(self, index_pattern: str):
105
conn = self.get_conn()
106
return conn.execute_sql(f"SELECT * FROM {index_pattern}")
107
```
108
109
### Configuration
110
111
The hook uses Airflow connections with the following parameters:
112
113
- **Host**: Elasticsearch server hostname
114
- **Port**: Elasticsearch server port (default: 9200)
115
- **Login**: Username for authentication
116
- **Password**: Password for authentication
117
- **Schema**: Connection scheme (http/https)
118
- **Extra**: Additional connection parameters as JSON
119
120
Example connection extra parameters:
121
```json
122
{
123
"http_compress": true,
124
"verify_certs": false,
125
"fetch_size": 1000,
126
"field_multi_value_leniency": true
127
}
128
```
129
130
### Notes
131
132
- The hook provides PEP 249 database API compliance through the ESConnection wrapper
133
- Polars DataFrame integration is not supported (raises NotImplementedError)
134
- All SQL operations are handled through Elasticsearch's SQL API
135
- Connection parameters are automatically extracted from Airflow connections
136
- The hook inherits all functionality from Airflow's DbApiHook base class