CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-elasticsearch

Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

connection-management.mddocs/

Connection Management

Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination. These components provide the foundation for the SQL Hook's database-like interface.

Capabilities

Connection Factory Function

Factory function that creates configured ESConnection instances with authentication and connection parameters.

def connect(
    host: str = "localhost",
    port: int = 9200,
    user: str | None = None,
    password: str | None = None,
    scheme: str = "http",
    **kwargs: Any
) -> ESConnection:
    """
    Create an ESConnection instance with specified parameters.
    
    Parameters:
    - host: Elasticsearch server hostname (default: "localhost")
    - port: Elasticsearch server port (default: 9200)
    - user: Username for authentication (optional)
    - password: Password for authentication (optional)
    - scheme: Connection scheme - "http" or "https" (default: "http")
    - **kwargs: Additional connection arguments
    
    Returns:
    Configured ESConnection instance
    """

Connection Class

Wrapper class for elasticsearch.Elasticsearch that provides database-like connection interface with cursor support.

class ESConnection:
    """
    Wrapper class for elasticsearch.Elasticsearch.
    
    Provides a database-like connection interface with cursor support
    and SQL query execution capabilities.
    """
    
    def __init__(
        self,
        host: str = "localhost",
        port: int = 9200,
        user: str | None = None,
        password: str | None = None,
        scheme: str = "http",
        **kwargs: Any
    ):
        """
        Initialize ESConnection with connection parameters.
        
        Parameters:
        - host: Elasticsearch server hostname
        - port: Elasticsearch server port
        - user: Username for authentication (optional)
        - password: Password for authentication (optional)
        - scheme: Connection scheme ("http" or "https")
        - **kwargs: Additional Elasticsearch client arguments
        """
        
    def cursor(self) -> ElasticsearchSQLCursor:
        """
        Create a new cursor for executing SQL queries.
        
        Returns:
        ElasticsearchSQLCursor instance for query execution
        """
        
    def close(self):
        """
        Close the Elasticsearch connection.
        """
        
    def commit(self):
        """
        Commit transaction (no-op for Elasticsearch).
        """
        
    def execute_sql(
        self, 
        query: str, 
        params: Iterable | Mapping[str, Any] | None = None
    ) -> ObjectApiResponse:
        """
        Execute a SQL query directly on the connection.
        
        Parameters:
        - query: SQL query string to execute
        - params: Query parameters (optional)
        
        Returns:
        ObjectApiResponse from Elasticsearch SQL API
        """

Cursor Class

PEP 249-like cursor class for executing SQL queries against Elasticsearch with full result pagination support.

class ElasticsearchSQLCursor:
    """
    A PEP 249-like Cursor class for Elasticsearch SQL API.
    
    Provides standard database cursor interface for SQL query execution
    with support for result pagination and metadata access.
    """
    
    def __init__(self, es: Elasticsearch, **kwargs):
        """
        Initialize cursor with Elasticsearch client and options.
        
        Parameters:
        - es: Elasticsearch client instance
        - **kwargs: Additional cursor options (fetch_size, field_multi_value_leniency)
        """
        
    @property
    def response(self) -> ObjectApiResponse:
        """
        Get the current query response.
        
        Returns:
        ObjectApiResponse from the last executed query
        """
        
    @property
    def cursor(self):
        """
        Get the cursor token for pagination.
        
        Returns:
        Cursor token string for next page, or None if no more results
        """
        
    @property
    def rows(self):
        """
        Get the rows from the current response.
        
        Returns:
        List of result rows from current query
        """
        
    @property
    def rowcount(self) -> int:
        """
        Get the number of rows in the current result set.
        
        Returns:
        Integer count of rows in current result
        """
        
    @property
    def description(self) -> list[tuple]:
        """
        Get column descriptions for the result set.
        
        Returns:
        List of (column_name, column_type) tuples
        """
        
    def execute(
        self, 
        statement: str, 
        params: Iterable | Mapping[str, Any] | None = None
    ) -> ObjectApiResponse:
        """
        Execute a SQL statement.
        
        Parameters:
        - statement: SQL statement to execute
        - params: Statement parameters (optional)
        
        Returns:
        ObjectApiResponse from Elasticsearch SQL API
        """
        
    def fetchone(self):
        """
        Fetch the next row from the result set.
        
        Returns:
        Single row as list, or None if no more rows
        """
        
    def fetchmany(self, size: int | None = None):
        """
        Fetch multiple rows from the result set.
        
        Parameters:
        - size: Number of rows to fetch (optional)
        
        Raises:
        NotImplementedError (not currently supported)
        """
        
    def fetchall(self):
        """
        Fetch all remaining rows from the result set.
        
        Automatically handles pagination using cursor tokens.
        
        Returns:
        List of all remaining rows
        """
        
    def close(self):
        """
        Close the cursor and clean up resources.
        """

Usage Examples

Basic Connection and Query

from airflow.providers.elasticsearch.hooks.elasticsearch import connect

# Create connection
conn = connect(
    host="localhost",
    port=9200,
    user="elastic",
    password="password",
    scheme="https"
)

# Execute SQL query
result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")

print(f"Found {len(result['rows'])} rows")
for row in result['rows']:
    print(row)

# Close connection
conn.close()

Cursor-based Query Execution

from airflow.providers.elasticsearch.hooks.elasticsearch import connect

# Create connection and cursor
conn = connect(host="localhost", port=9200)
cursor = conn.cursor()

# Execute query
response = cursor.execute("SELECT name, age, city FROM users WHERE age > 25")

# Access result metadata
print(f"Columns: {cursor.description}")
print(f"Row count: {cursor.rowcount}")

# Fetch results
first_row = cursor.fetchone()
print(f"First row: {first_row}")

all_rows = cursor.fetchall()
print(f"All rows: {len(all_rows)}")

# Clean up
cursor.close()
conn.close()

Parameterized Queries

conn = connect(host="localhost", port=9200)
cursor = conn.cursor()

# Query with parameters
query = "SELECT * FROM logs WHERE level = ? AND timestamp > ?"
params = ["ERROR", "2024-01-01T00:00:00"]

response = cursor.execute(query, params)

# Process results
for row in cursor.rows:
    print(f"Log entry: {row}")

cursor.close()
conn.close()

Pagination with Large Result Sets

conn = connect(
    host="localhost", 
    port=9200,
    fetch_size=1000  # Set page size
)
cursor = conn.cursor()

# Execute large query
cursor.execute("SELECT * FROM large_index")

# fetchall() automatically handles pagination
all_results = cursor.fetchall()
print(f"Retrieved {len(all_results)} total rows")

cursor.close()
conn.close()

Advanced Connection Configuration

# Connection with additional Elasticsearch client options
conn = connect(
    host="elasticsearch.example.com",
    port=9200,
    user="service_account",
    password="secret_password",
    scheme="https",
    # Additional Elasticsearch client arguments
    verify_certs=True,
    ca_certs="/path/to/ca.pem",
    timeout=30,
    max_retries=3,
    retry_on_status_code=[502, 503, 504],
    http_compress=True,
    fetch_size=5000,
    field_multi_value_leniency=True
)

cursor = conn.cursor()
cursor.execute("SELECT * FROM secure_index")
results = cursor.fetchall()

cursor.close()
conn.close()

Error Handling

from elasticsearch.exceptions import ConnectionError, RequestError

conn = connect(host="localhost", port=9200)
cursor = conn.cursor()

try:
    cursor.execute("SELECT * FROM nonexistent_index")
    results = cursor.fetchall()
except ConnectionError as e:
    print(f"Connection failed: {e}")
except RequestError as e:
    print(f"Query error: {e}")
finally:
    cursor.close()
    conn.close()

Configuration Options

Connection Parameters

The connection accepts various configuration options:

conn = connect(
    host="localhost",              # Elasticsearch host
    port=9200,                     # Elasticsearch port
    user="username",               # Authentication username
    password="password",           # Authentication password
    scheme="https",                # Connection scheme
    
    # Elasticsearch client options
    verify_certs=True,             # SSL certificate verification
    ca_certs="/path/to/ca.pem",    # CA certificate path
    client_cert="/path/to/cert.pem", # Client certificate
    client_key="/path/to/key.pem", # Client private key
    timeout=30,                    # Request timeout
    max_retries=3,                 # Maximum retry attempts
    http_compress=True,            # HTTP compression
    
    # Cursor options
    fetch_size=1000,               # Page size for results
    field_multi_value_leniency=False  # Multi-value field handling
)

Notes

  • The connection wrapper provides a database-like interface over Elasticsearch's native client
  • All SQL operations use Elasticsearch's SQL API for query execution
  • Cursor pagination is automatically handled for large result sets
  • The connection supports all standard Elasticsearch client configuration options
  • Parameter binding is supported for secure query execution
  • The cursor implementation follows PEP 249 database API standards where applicable

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch

docs

connection-management.md

index.md

python-hook.md

sql-hook.md

task-logging.md

version-compatibility.md

tile.json