CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

task-logging.mddocs/

Task Logging

Advanced logging capabilities that write Airflow task logs to Elasticsearch with support for JSON formatting, external log viewer integration (Kibana), and configurable index patterns. This enables centralized log management and search capabilities for Airflow task execution.

Capabilities

Task Handler Class

Main logging handler that extends FileTaskHandler to write task logs to Elasticsearch with configurable formatting and indexing options.

class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
    """
    Handler for logging tasks to Elasticsearch.
    
    Supports writing logs to both local files and Elasticsearch with configurable
    JSON formatting, external viewer integration, and index management.
    """
    
    def __init__(
        self,
        base_log_folder,
        end_of_log_mark="end_of_log",
        write_stdout=True,
        json_format=False,
        json_fields="asctime, filename, lineno, levelname, message",
        host_field="host",
        offset_field="offset",
        filename_template=None,
        elasticsearch_configs=None,
        es_kwargs=None
    ):
        """
        Initialize the Elasticsearch Task Handler.
        
        Parameters:
        - base_log_folder: Base directory for log files
        - end_of_log_mark: Marker string for end of log stream
        - write_stdout: Whether to write to stdout
        - json_format: Whether to format logs as JSON
        - json_fields: Fields to include in JSON logs
        - host_field: Field name for host information
        - offset_field: Field name for log offset
        - filename_template: Template for log file names
        - elasticsearch_configs: Elasticsearch configuration dictionary
        - es_kwargs: Additional Elasticsearch connection arguments
        """
        
    def emit(self, record):
        """
        Emit a log record to both file and Elasticsearch.
        
        Parameters:
        - record: LogRecord instance to emit
        """
        
    def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None:
        """
        Set the logging context for a task instance.
        
        Parameters:
        - ti: TaskInstance to set context for
        - identifier: Optional identifier for the logging context
        """
        
    def close(self) -> None:
        """
        Close the handler and clean up resources.
        """
        
    @property
    def log_name(self) -> str:
        """
        Get the name of the current log.
        
        Returns:
        String name of the log
        """
        
    def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str:
        """
        Get the external log URL (e.g., Kibana) for a task instance.
        
        Parameters:
        - task_instance: TaskInstance to get URL for
        - try_number: Try number for the task execution
        
        Returns:
        String URL for external log viewer
        """
        
    @property
    def supports_external_link(self) -> bool:
        """
        Check if external log links are supported.
        
        Returns:
        Boolean indicating external link support
        """
        
    @staticmethod
    def format_url(host: str) -> str:
        """
        Format the given host string to ensure it starts with 'http' and check if it represents a valid URL.
        
        Parameters:
        - host: The host string to format and check
        
        Returns:
        Properly formatted host URL string
        
        Raises:
        ValueError: If the host is not a valid URL
        """

JSON Formatter Class

Specialized JSON formatter for Elasticsearch log entries with ISO 8601 timestamp formatting.

class ElasticsearchJSONFormatter(JSONFormatter):
    """
    Convert a log record to JSON with ISO 8601 date and time format.
    """
    
    default_time_format = "%Y-%m-%dT%H:%M:%S"
    default_msec_format = "%s.%03d"
    default_tz_format = "%z"
    
    def formatTime(self, record, datefmt=None):
        """
        Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.
        
        Parameters:
        - record: LogRecord instance
        - datefmt: Optional date format string
        
        Returns:
        Formatted timestamp string
        """

Configuration Functions

Utility functions for retrieving and managing Elasticsearch logging configuration.

def get_es_kwargs_from_config() -> dict[str, Any]:
    """
    Get Elasticsearch connection kwargs from Airflow configuration.
    
    Returns:
    Dictionary of Elasticsearch connection parameters
    """

Response Classes

Classes for handling and accessing Elasticsearch search responses and document hits.

class AttributeList:
    """
    Helper class to provide attribute like access to List objects.
    """
    
    def __init__(self, _list):
        """Initialize with a list object."""
        
    def __getitem__(self, k):
        """Retrieve an item or a slice from the list."""
        
    def __iter__(self):
        """Provide an iterator for the list."""
        
    def __bool__(self):
        """Check if the list is non-empty."""

class AttributeDict:
    """
    Helper class to provide attribute like access to Dictionary objects.
    """
    
    def __init__(self, d):
        """Initialize with a dictionary object."""
        
    def __getattr__(self, attr_name):
        """Retrieve an item as an attribute from the dictionary."""
        
    def __getitem__(self, key):
        """Retrieve an item using a key from the dictionary."""
        
    def to_dict(self):
        """Convert back to regular dictionary."""

class Hit(AttributeDict):
    """
    The Hit class is used to manage and access elements in a document.
    
    It inherits from the AttributeDict class and provides
    attribute-like access to its elements, similar to a dictionary.
    """
    
    def __init__(self, document):
        """Initialize with document data and metadata."""

class HitMeta(AttributeDict):
    """
    The HitMeta class is used to manage and access metadata of a document.
    
    This class inherits from the AttributeDict class and provides
    attribute-like access to its elements.
    """
    
    def __init__(self, document, exclude=("_source", "_fields")):
        """Initialize with document metadata, excluding specified fields."""

class ElasticSearchResponse(AttributeDict):
    """
    The ElasticSearchResponse class is used to manage and access the response from an Elasticsearch search.
    
    This class can be iterated over directly to access hits in the response. Indexing the class instance
    with an integer or slice will also access the hits. The class also evaluates to True
    if there are any hits in the response.
    """
    
    def __init__(self, search, response, doc_class=None):
        """Initialize with search instance, response data, and optional document class."""
        
    def __iter__(self) -> Iterator[Hit]:
        """Provide an iterator over the hits in the Elasticsearch response."""
        
    def __getitem__(self, key):
        """Retrieve a specific hit or a slice of hits from the Elasticsearch response."""
        
    def __bool__(self):
        """Evaluate the presence of hits in the Elasticsearch response."""
        
    @property
    def hits(self) -> list[Hit]:
        """
        Access to the hits (results) of the Elasticsearch response.
        
        The hits are represented as an AttributeList of Hit instances, which allow for easy,
        attribute-like access to the hit data. Hits are lazily loaded upon first access.
        """

Usage Examples

Basic Task Logging Setup

from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler

# Configure handler with basic settings
handler = ElasticsearchTaskHandler(
    base_log_folder="/opt/airflow/logs",
    write_stdout=True,
    json_format=True,
    elasticsearch_configs={
        "host": "localhost:9200",
        "target_index": "airflow-logs"
    }
)

# Set context for a task
from airflow.models import TaskInstance
handler.set_context(task_instance)

# The handler will automatically write logs to Elasticsearch

Advanced Configuration with Kibana Integration

# Advanced configuration with external viewer
handler = ElasticsearchTaskHandler(
    base_log_folder="/opt/airflow/logs",
    json_format=True,
    json_fields="asctime, filename, lineno, levelname, message, dag_id, task_id",
    elasticsearch_configs={
        "host": "elasticsearch.example.com:9200",
        "target_index": "airflow-logs-{ds}",
        "frontend": "https://kibana.example.com/app/discover?_a=(query:(language:kuery,query:'log_id: \"{log_id}\"'))",
        "write_to_es": True,
        "verify_certs": True
    },
    es_kwargs={
        "basic_auth": ("elastic", "password"),
        "ca_certs": "/etc/ssl/certs/ca.pem"
    }
)

Custom Log Processing

from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
import logging

class CustomElasticsearchHandler(ElasticsearchTaskHandler):
    def emit(self, record):
        # Add custom fields to log record
        record.custom_field = "custom_value"
        record.environment = "production"
        
        # Call parent emit method
        super().emit(record)
        
    def _format_log_message(self, record):
        # Custom log message formatting
        return f"[{record.levelname}] {record.getMessage()}"

# Use custom handler
handler = CustomElasticsearchHandler(
    base_log_folder="/opt/airflow/logs",
    json_format=True
)

Configuration Options

Airflow Configuration

Configure in airflow.cfg:

[elasticsearch]
# Elasticsearch host
host = localhost:9200

# Log ID template for query construction
log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}

# End of log marker
end_of_log_mark = end_of_log

# Kibana frontend URL template
frontend = http://localhost:5601/app/kibana#/discover?_a=(query:(language:kuery,query:'log_id: "{log_id}"'))

# Write to stdout
write_stdout = False

# Write to Elasticsearch
write_to_es = True

# Target index name
target_index = airflow-logs

# JSON formatting
json_format = True
json_fields = asctime, filename, lineno, levelname, message

# Field mappings
host_field = host
offset_field = offset

# Index patterns for search
index_patterns = _all
index_patterns_callable = 

[elasticsearch_configs]
# HTTP compression
http_compress = False

# Certificate verification
verify_certs = True

Dynamic Index Patterns

def custom_index_pattern(task_instance):
    """Custom index pattern based on task instance."""
    dag_id = task_instance.dag_id
    execution_date = task_instance.execution_date
    
    # Create date-based index pattern
    date_str = execution_date.strftime("%Y.%m.%d")
    return f"airflow-{dag_id}-{date_str}"

# Configure in airflow.cfg
# index_patterns_callable = mymodule.custom_index_pattern

External Log Viewer Integration

The handler supports integration with external log viewers like Kibana:

# Get external log URL for a task
handler = ElasticsearchTaskHandler(...)
task_instance = TaskInstance(...)

if handler.supports_external_link:
    external_url = handler.get_external_log_url(task_instance, try_number=1)
    print(f"View logs in Kibana: {external_url}")

Notes

  • The handler writes logs to both local files and Elasticsearch simultaneously
  • JSON formatting is recommended for structured log analysis
  • Index patterns support date-based partitioning for better performance
  • External viewer integration requires proper frontend URL configuration
  • The handler supports authentication and SSL/TLS connections to Elasticsearch
  • Log records are automatically enriched with task metadata (dag_id, task_id, etc.)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch

docs

connection-management.md

index.md

python-hook.md

sql-hook.md

task-logging.md

version-compatibility.md

tile.json