Provider package that enables Elasticsearch integration for Apache Airflow workflows with hooks, logging, and SQL capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Low-level connection utilities and cursor implementations for direct database-style access to Elasticsearch with SQL query support and result pagination. These components provide the foundation for the SQL Hook's database-like interface.
Factory function that creates configured ESConnection instances with authentication and connection parameters.
def connect(
host: str = "localhost",
port: int = 9200,
user: str | None = None,
password: str | None = None,
scheme: str = "http",
**kwargs: Any
) -> ESConnection:
"""
Create an ESConnection instance with specified parameters.
Parameters:
- host: Elasticsearch server hostname (default: "localhost")
- port: Elasticsearch server port (default: 9200)
- user: Username for authentication (optional)
- password: Password for authentication (optional)
- scheme: Connection scheme - "http" or "https" (default: "http")
- **kwargs: Additional connection arguments
Returns:
Configured ESConnection instance
"""Wrapper class for elasticsearch.Elasticsearch that provides database-like connection interface with cursor support.
class ESConnection:
"""
Wrapper class for elasticsearch.Elasticsearch.
Provides a database-like connection interface with cursor support
and SQL query execution capabilities.
"""
def __init__(
self,
host: str = "localhost",
port: int = 9200,
user: str | None = None,
password: str | None = None,
scheme: str = "http",
**kwargs: Any
):
"""
Initialize ESConnection with connection parameters.
Parameters:
- host: Elasticsearch server hostname
- port: Elasticsearch server port
- user: Username for authentication (optional)
- password: Password for authentication (optional)
- scheme: Connection scheme ("http" or "https")
- **kwargs: Additional Elasticsearch client arguments
"""
def cursor(self) -> ElasticsearchSQLCursor:
"""
Create a new cursor for executing SQL queries.
Returns:
ElasticsearchSQLCursor instance for query execution
"""
def close(self):
"""
Close the Elasticsearch connection.
"""
def commit(self):
"""
Commit transaction (no-op for Elasticsearch).
"""
def execute_sql(
self,
query: str,
params: Iterable | Mapping[str, Any] | None = None
) -> ObjectApiResponse:
"""
Execute a SQL query directly on the connection.
Parameters:
- query: SQL query string to execute
- params: Query parameters (optional)
Returns:
ObjectApiResponse from Elasticsearch SQL API
"""PEP 249-like cursor class for executing SQL queries against Elasticsearch with full result pagination support.
class ElasticsearchSQLCursor:
"""
A PEP 249-like Cursor class for Elasticsearch SQL API.
Provides standard database cursor interface for SQL query execution
with support for result pagination and metadata access.
"""
def __init__(self, es: Elasticsearch, **kwargs):
"""
Initialize cursor with Elasticsearch client and options.
Parameters:
- es: Elasticsearch client instance
- **kwargs: Additional cursor options (fetch_size, field_multi_value_leniency)
"""
@property
def response(self) -> ObjectApiResponse:
"""
Get the current query response.
Returns:
ObjectApiResponse from the last executed query
"""
@property
def cursor(self):
"""
Get the cursor token for pagination.
Returns:
Cursor token string for next page, or None if no more results
"""
@property
def rows(self):
"""
Get the rows from the current response.
Returns:
List of result rows from current query
"""
@property
def rowcount(self) -> int:
"""
Get the number of rows in the current result set.
Returns:
Integer count of rows in current result
"""
@property
def description(self) -> list[tuple]:
"""
Get column descriptions for the result set.
Returns:
List of (column_name, column_type) tuples
"""
def execute(
self,
statement: str,
params: Iterable | Mapping[str, Any] | None = None
) -> ObjectApiResponse:
"""
Execute a SQL statement.
Parameters:
- statement: SQL statement to execute
- params: Statement parameters (optional)
Returns:
ObjectApiResponse from Elasticsearch SQL API
"""
def fetchone(self):
"""
Fetch the next row from the result set.
Returns:
Single row as list, or None if no more rows
"""
def fetchmany(self, size: int | None = None):
"""
Fetch multiple rows from the result set.
Parameters:
- size: Number of rows to fetch (optional)
Raises:
NotImplementedError (not currently supported)
"""
def fetchall(self):
"""
Fetch all remaining rows from the result set.
Automatically handles pagination using cursor tokens.
Returns:
List of all remaining rows
"""
def close(self):
"""
Close the cursor and clean up resources.
"""from airflow.providers.elasticsearch.hooks.elasticsearch import connect
# Create connection
conn = connect(
host="localhost",
port=9200,
user="elastic",
password="password",
scheme="https"
)
# Execute SQL query
result = conn.execute_sql("SELECT * FROM my_index WHERE status = 'active' LIMIT 10")
print(f"Found {len(result['rows'])} rows")
for row in result['rows']:
print(row)
# Close connection
conn.close()from airflow.providers.elasticsearch.hooks.elasticsearch import connect
# Create connection and cursor
conn = connect(host="localhost", port=9200)
cursor = conn.cursor()
# Execute query
response = cursor.execute("SELECT name, age, city FROM users WHERE age > 25")
# Access result metadata
print(f"Columns: {cursor.description}")
print(f"Row count: {cursor.rowcount}")
# Fetch results
first_row = cursor.fetchone()
print(f"First row: {first_row}")
all_rows = cursor.fetchall()
print(f"All rows: {len(all_rows)}")
# Clean up
cursor.close()
conn.close()conn = connect(host="localhost", port=9200)
cursor = conn.cursor()
# Query with parameters
query = "SELECT * FROM logs WHERE level = ? AND timestamp > ?"
params = ["ERROR", "2024-01-01T00:00:00"]
response = cursor.execute(query, params)
# Process results
for row in cursor.rows:
print(f"Log entry: {row}")
cursor.close()
conn.close()conn = connect(
host="localhost",
port=9200,
fetch_size=1000 # Set page size
)
cursor = conn.cursor()
# Execute large query
cursor.execute("SELECT * FROM large_index")
# fetchall() automatically handles pagination
all_results = cursor.fetchall()
print(f"Retrieved {len(all_results)} total rows")
cursor.close()
conn.close()# Connection with additional Elasticsearch client options
conn = connect(
host="elasticsearch.example.com",
port=9200,
user="service_account",
password="secret_password",
scheme="https",
# Additional Elasticsearch client arguments
verify_certs=True,
ca_certs="/path/to/ca.pem",
timeout=30,
max_retries=3,
retry_on_status_code=[502, 503, 504],
http_compress=True,
fetch_size=5000,
field_multi_value_leniency=True
)
cursor = conn.cursor()
cursor.execute("SELECT * FROM secure_index")
results = cursor.fetchall()
cursor.close()
conn.close()from elasticsearch.exceptions import ConnectionError, RequestError
conn = connect(host="localhost", port=9200)
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM nonexistent_index")
results = cursor.fetchall()
except ConnectionError as e:
print(f"Connection failed: {e}")
except RequestError as e:
print(f"Query error: {e}")
finally:
cursor.close()
conn.close()The connection accepts various configuration options:
conn = connect(
host="localhost", # Elasticsearch host
port=9200, # Elasticsearch port
user="username", # Authentication username
password="password", # Authentication password
scheme="https", # Connection scheme
# Elasticsearch client options
verify_certs=True, # SSL certificate verification
ca_certs="/path/to/ca.pem", # CA certificate path
client_cert="/path/to/cert.pem", # Client certificate
client_key="/path/to/key.pem", # Client private key
timeout=30, # Request timeout
max_retries=3, # Maximum retry attempts
http_compress=True, # HTTP compression
# Cursor options
fetch_size=1000, # Page size for results
field_multi_value_leniency=False # Multi-value field handling
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-elasticsearch