0
# Elasticsearch Python Hook
1
2
Native Elasticsearch operations using the official Elasticsearch Python client. This hook provides direct access to all Elasticsearch APIs including search, indexing, cluster management, and advanced features like aggregations and machine learning.
3
4
## Capabilities
5
6
### Python Hook Class
7
8
Hook class that provides native Elasticsearch client access with full API support for all Elasticsearch operations.
9
10
```python { .api }
11
class ElasticsearchPythonHook(BaseHook):
12
"""
13
Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
14
15
:param hosts: A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
16
:param es_conn_args: Additional arguments you might need to enter to connect to Elasticsearch.
17
Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
18
"""
19
20
def __init__(self, hosts: list[Any], es_conn_args: dict | None = None):
21
"""
22
Initialize the Elasticsearch Python Hook.
23
24
Parameters:
25
- hosts: List of Elasticsearch host URLs
26
- es_conn_args: Dictionary of additional connection arguments
27
"""
28
29
@cached_property
30
def get_conn(self) -> Elasticsearch:
31
"""
32
Return the Elasticsearch client (cached).
33
34
This is implemented as a cached_property, so the client connection
35
is created once and reused for subsequent calls.
36
37
Returns:
38
Cached Elasticsearch client instance with connection configured
39
"""
40
41
def search(self, query: dict[Any, Any], index: str = "_all") -> dict:
42
"""
43
Return results matching a query using Elasticsearch DSL.
44
45
Parameters:
46
- index: The index you want to query (default: "_all")
47
- query: The query you want to run as a dictionary
48
49
Returns:
50
The response 'hits' object from Elasticsearch
51
"""
52
```
53
54
### Usage Examples
55
56
#### Basic Search Operations
57
58
```python
59
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
60
61
# Initialize hook with single host
62
hook = ElasticsearchPythonHook(
63
hosts=["http://localhost:9200"]
64
)
65
66
# Simple search query
67
query = {
68
"query": {
69
"match": {
70
"message": "error"
71
}
72
}
73
}
74
75
results = hook.search(query=query, index="logs-*")
76
print(f"Found {len(results['hits'])} matching documents")
77
78
for hit in results['hits']:
79
print(f"Document ID: {hit['_id']}, Score: {hit['_score']}")
80
print(f"Source: {hit['_source']}")
81
```
82
83
#### Advanced Search with Authentication
84
85
```python
86
# Initialize with authentication and SSL settings
87
hook = ElasticsearchPythonHook(
88
hosts=["https://elasticsearch.example.com:9200"],
89
es_conn_args={
90
"basic_auth": ("username", "password"),
91
"ca_certs": "/path/to/ca.crt",
92
"verify_certs": True,
93
"ssl_show_warn": False
94
}
95
)
96
97
# Complex aggregation query
98
query = {
99
"query": {
100
"bool": {
101
"must": [
102
{"range": {"@timestamp": {"gte": "2024-01-01"}}},
103
{"term": {"service.name": "api"}}
104
]
105
}
106
},
107
"aggs": {
108
"status_codes": {
109
"terms": {
110
"field": "http.response.status_code",
111
"size": 10
112
}
113
}
114
},
115
"size": 0
116
}
117
118
results = hook.search(query=query, index="logs-2024-*")
119
status_codes = results.get('aggregations', {}).get('status_codes', {}).get('buckets', [])
120
121
for bucket in status_codes:
122
print(f"Status {bucket['key']}: {bucket['doc_count']} occurrences")
123
```
124
125
#### Using Native Elasticsearch Client
126
127
```python
128
# Get direct access to the Elasticsearch client
129
hook = ElasticsearchPythonHook(
130
hosts=["http://localhost:9200"],
131
es_conn_args={"timeout": 30}
132
)
133
134
es_client = hook.get_conn
135
136
# Index a document
137
doc = {
138
"timestamp": "2024-01-01T12:00:00",
139
"message": "Application started",
140
"level": "INFO"
141
}
142
143
response = es_client.index(
144
index="application-logs",
145
document=doc
146
)
147
print(f"Document indexed with ID: {response['_id']}")
148
149
# Get cluster health
150
health = es_client.cluster.health()
151
print(f"Cluster status: {health['status']}")
152
print(f"Number of nodes: {health['number_of_nodes']}")
153
154
# Create an index with mapping
155
mapping = {
156
"mappings": {
157
"properties": {
158
"timestamp": {"type": "date"},
159
"message": {"type": "text"},
160
"level": {"type": "keyword"}
161
}
162
}
163
}
164
165
es_client.indices.create(index="custom-logs", body=mapping)
166
```
167
168
#### Bulk Operations
169
170
```python
171
from elasticsearch import helpers
172
173
hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])
174
es_client = hook.get_conn
175
176
# Bulk index documents
177
docs = [
178
{
179
"_index": "bulk-logs",
180
"_source": {
181
"timestamp": "2024-01-01T12:00:00",
182
"message": f"Log message {i}",
183
"level": "INFO"
184
}
185
}
186
for i in range(1000)
187
]
188
189
# Use helpers for bulk operations
190
success, failed = helpers.bulk(es_client, docs)
191
print(f"Successfully indexed: {success}, Failed: {len(failed)}")
192
```
193
194
### Connection Configuration
195
196
The hook accepts various connection arguments through the `es_conn_args` parameter:
197
198
#### Authentication
199
```python
200
# Basic authentication
201
es_conn_args = {
202
"basic_auth": ("username", "password")
203
}
204
205
# API Key authentication
206
es_conn_args = {
207
"api_key": ("api_key_id", "api_key_secret")
208
}
209
210
# Bearer token authentication
211
es_conn_args = {
212
"bearer_auth": "bearer_token_string"
213
}
214
```
215
216
#### SSL/TLS Configuration
217
```python
218
es_conn_args = {
219
"ca_certs": "/path/to/ca.pem",
220
"client_cert": "/path/to/client.pem",
221
"client_key": "/path/to/client-key.pem",
222
"verify_certs": True,
223
"ssl_show_warn": False
224
}
225
```
226
227
#### Connection Tuning
228
```python
229
es_conn_args = {
230
"timeout": 30,
231
"max_retries": 3,
232
"retry_on_timeout": True,
233
"http_compress": True,
234
"headers": {"User-Agent": "My-App/1.0"}
235
}
236
```
237
238
### Notes
239
240
- The hook provides access to the full Elasticsearch Python client API
241
- All Elasticsearch operations are supported (search, index, delete, cluster management, etc.)
242
- Connection is cached using `@cached_property` for performance
243
- The hook supports all Elasticsearch client configuration options
244
- For basic search operations, use the `search()` method; for advanced operations, use `get_conn` to access the client directly