CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

python-hook.mddocs/

Elasticsearch Python Hook

Native Elasticsearch operations using the official Elasticsearch Python client. This hook provides direct access to all Elasticsearch APIs including search, indexing, cluster management, and advanced features like aggregations and machine learning.

Capabilities

Python Hook Class

Hook class that provides native Elasticsearch client access with full API support for all Elasticsearch operations.

class ElasticsearchPythonHook(BaseHook):
    """
    Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
    
    :param hosts: A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
    :param es_conn_args: Additional arguments you might need to enter to connect to Elasticsearch.
                        Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
    """
    
    def __init__(self, hosts: list[Any], es_conn_args: dict | None = None):
        """
        Initialize the Elasticsearch Python Hook.
        
        Parameters:
        - hosts: List of Elasticsearch host URLs
        - es_conn_args: Dictionary of additional connection arguments
        """
        
    @cached_property
    def get_conn(self) -> Elasticsearch:
        """
        Return the Elasticsearch client (cached).
        
        This is implemented as a cached_property, so the client connection
        is created once and reused for subsequent calls.
        
        Returns:
        Cached Elasticsearch client instance with connection configured
        """
        
    def search(self, query: dict[Any, Any], index: str = "_all") -> dict:
        """
        Return results matching a query using Elasticsearch DSL.
        
        Parameters:
        - index: The index you want to query (default: "_all")
        - query: The query you want to run as a dictionary
        
        Returns:
        The response 'hits' object from Elasticsearch
        """

Usage Examples

Basic Search Operations

from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

# Initialize hook with single host
hook = ElasticsearchPythonHook(
    hosts=["http://localhost:9200"]
)

# Simple search query
query = {
    "query": {
        "match": {
            "message": "error"
        }
    }
}

results = hook.search(query=query, index="logs-*")
print(f"Found {len(results['hits'])} matching documents")

for hit in results['hits']:
    print(f"Document ID: {hit['_id']}, Score: {hit['_score']}")
    print(f"Source: {hit['_source']}")

Advanced Search with Authentication

# Initialize with authentication and SSL settings
hook = ElasticsearchPythonHook(
    hosts=["https://elasticsearch.example.com:9200"],
    es_conn_args={
        "basic_auth": ("username", "password"),
        "ca_certs": "/path/to/ca.crt",
        "verify_certs": True,
        "ssl_show_warn": False
    }
)

# Complex aggregation query
query = {
    "query": {
        "bool": {
            "must": [
                {"range": {"@timestamp": {"gte": "2024-01-01"}}},
                {"term": {"service.name": "api"}}
            ]
        }
    },
    "aggs": {
        "status_codes": {
            "terms": {
                "field": "http.response.status_code",
                "size": 10
            }
        }
    },
    "size": 0
}

results = hook.search(query=query, index="logs-2024-*")
status_codes = results.get('aggregations', {}).get('status_codes', {}).get('buckets', [])

for bucket in status_codes:
    print(f"Status {bucket['key']}: {bucket['doc_count']} occurrences")

Using Native Elasticsearch Client

# Get direct access to the Elasticsearch client
hook = ElasticsearchPythonHook(
    hosts=["http://localhost:9200"],
    es_conn_args={"timeout": 30}
)

es_client = hook.get_conn

# Index a document
doc = {
    "timestamp": "2024-01-01T12:00:00",
    "message": "Application started",
    "level": "INFO"
}

response = es_client.index(
    index="application-logs",
    document=doc
)
print(f"Document indexed with ID: {response['_id']}")

# Get cluster health
health = es_client.cluster.health()
print(f"Cluster status: {health['status']}")
print(f"Number of nodes: {health['number_of_nodes']}")

# Create an index with mapping
mapping = {
    "mappings": {
        "properties": {
            "timestamp": {"type": "date"},
            "message": {"type": "text"},
            "level": {"type": "keyword"}
        }
    }
}

es_client.indices.create(index="custom-logs", body=mapping)

Bulk Operations

from elasticsearch import helpers

hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])
es_client = hook.get_conn

# Bulk index documents
docs = [
    {
        "_index": "bulk-logs",
        "_source": {
            "timestamp": "2024-01-01T12:00:00",
            "message": f"Log message {i}",
            "level": "INFO"
        }
    }
    for i in range(1000)
]

# Use helpers for bulk operations
success, failed = helpers.bulk(es_client, docs)
print(f"Successfully indexed: {success}, Failed: {len(failed)}")

Connection Configuration

The hook accepts various connection arguments through the es_conn_args parameter:

Authentication

# Basic authentication
es_conn_args = {
    "basic_auth": ("username", "password")
}

# API Key authentication
es_conn_args = {
    "api_key": ("api_key_id", "api_key_secret")
}

# Bearer token authentication
es_conn_args = {
    "bearer_auth": "bearer_token_string"
}

SSL/TLS Configuration

es_conn_args = {
    "ca_certs": "/path/to/ca.pem",
    "client_cert": "/path/to/client.pem",
    "client_key": "/path/to/client-key.pem",
    "verify_certs": True,
    "ssl_show_warn": False
}

Connection Tuning

es_conn_args = {
    "timeout": 30,
    "max_retries": 3,
    "retry_on_timeout": True,
    "http_compress": True,
    "headers": {"User-Agent": "My-App/1.0"}
}

Notes

  • The hook provides access to the full Elasticsearch Python client API
  • All Elasticsearch operations are supported (search, index, delete, cluster management, etc.)
  • Connection is cached using @cached_property for performance
  • The hook supports all Elasticsearch client configuration options
  • For basic search operations, use the search() method; for advanced operations, use get_conn to access the client directly

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch

docs

connection-management.md

index.md

python-hook.md

sql-hook.md

task-logging.md

version-compatibility.md

tile.json