Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Native Elasticsearch operations using the official Elasticsearch Python client. This hook provides direct access to all Elasticsearch APIs including search, indexing, cluster management, and advanced features like aggregations and machine learning.
Hook class that provides native Elasticsearch client access with full API support for all Elasticsearch operations.
class ElasticsearchPythonHook(BaseHook):
"""
Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
:param hosts: A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
:param es_conn_args: Additional arguments you might need to enter to connect to Elasticsearch.
Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
"""
def __init__(self, hosts: list[Any], es_conn_args: dict | None = None):
"""
Initialize the Elasticsearch Python Hook.
Parameters:
- hosts: List of Elasticsearch host URLs
- es_conn_args: Dictionary of additional connection arguments
"""
@cached_property
def get_conn(self) -> Elasticsearch:
"""
Return the Elasticsearch client (cached).
This is implemented as a cached_property, so the client connection
is created once and reused for subsequent calls.
Returns:
Cached Elasticsearch client instance with connection configured
"""
def search(self, query: dict[Any, Any], index: str = "_all") -> dict:
"""
Return results matching a query using Elasticsearch DSL.
Parameters:
- index: The index you want to query (default: "_all")
- query: The query you want to run as a dictionary
Returns:
The response 'hits' object from Elasticsearch
"""from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
# Initialize hook with single host
hook = ElasticsearchPythonHook(
hosts=["http://localhost:9200"]
)
# Simple search query
query = {
"query": {
"match": {
"message": "error"
}
}
}
results = hook.search(query=query, index="logs-*")
print(f"Found {len(results['hits'])} matching documents")
for hit in results['hits']:
print(f"Document ID: {hit['_id']}, Score: {hit['_score']}")
print(f"Source: {hit['_source']}")# Initialize with authentication and SSL settings
hook = ElasticsearchPythonHook(
hosts=["https://elasticsearch.example.com:9200"],
es_conn_args={
"basic_auth": ("username", "password"),
"ca_certs": "/path/to/ca.crt",
"verify_certs": True,
"ssl_show_warn": False
}
)
# Complex aggregation query
query = {
"query": {
"bool": {
"must": [
{"range": {"@timestamp": {"gte": "2024-01-01"}}},
{"term": {"service.name": "api"}}
]
}
},
"aggs": {
"status_codes": {
"terms": {
"field": "http.response.status_code",
"size": 10
}
}
},
"size": 0
}
results = hook.search(query=query, index="logs-2024-*")
status_codes = results.get('aggregations', {}).get('status_codes', {}).get('buckets', [])
for bucket in status_codes:
print(f"Status {bucket['key']}: {bucket['doc_count']} occurrences")# Get direct access to the Elasticsearch client
hook = ElasticsearchPythonHook(
hosts=["http://localhost:9200"],
es_conn_args={"timeout": 30}
)
es_client = hook.get_conn
# Index a document
doc = {
"timestamp": "2024-01-01T12:00:00",
"message": "Application started",
"level": "INFO"
}
response = es_client.index(
index="application-logs",
document=doc
)
print(f"Document indexed with ID: {response['_id']}")
# Get cluster health
health = es_client.cluster.health()
print(f"Cluster status: {health['status']}")
print(f"Number of nodes: {health['number_of_nodes']}")
# Create an index with mapping
mapping = {
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"message": {"type": "text"},
"level": {"type": "keyword"}
}
}
}
es_client.indices.create(index="custom-logs", body=mapping)from elasticsearch import helpers
hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])
es_client = hook.get_conn
# Bulk index documents
docs = [
{
"_index": "bulk-logs",
"_source": {
"timestamp": "2024-01-01T12:00:00",
"message": f"Log message {i}",
"level": "INFO"
}
}
for i in range(1000)
]
# Use helpers for bulk operations
success, failed = helpers.bulk(es_client, docs)
print(f"Successfully indexed: {success}, Failed: {len(failed)}")The hook accepts various connection arguments through the es_conn_args parameter:
# Basic authentication
es_conn_args = {
"basic_auth": ("username", "password")
}
# API Key authentication
es_conn_args = {
"api_key": ("api_key_id", "api_key_secret")
}
# Bearer token authentication
es_conn_args = {
"bearer_auth": "bearer_token_string"
}es_conn_args = {
"ca_certs": "/path/to/ca.pem",
"client_cert": "/path/to/client.pem",
"client_key": "/path/to/client-key.pem",
"verify_certs": True,
"ssl_show_warn": False
}es_conn_args = {
"timeout": 30,
"max_retries": 3,
"retry_on_timeout": True,
"http_compress": True,
"headers": {"User-Agent": "My-App/1.0"}
}@cached_property for performancesearch() method; for advanced operations, use get_conn to access the client directlyInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch