CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch

Python client for Elasticsearch with comprehensive API coverage and both sync and async support

Pending
Overview
Eval results
Files

esql-operations.mddocs/

ES|QL Operations

ES|QL (Elasticsearch Query Language) provides SQL-like syntax for querying and analyzing Elasticsearch data. The ESQL client offers both synchronous query execution and asynchronous query management for long-running operations.

Capabilities

Synchronous Query Execution

Execute ES|QL queries synchronously and get immediate results for fast queries.

def query(
    self,
    *,
    query: Optional[str] = None,
    allow_partial_results: Optional[bool] = None,
    columnar: Optional[bool] = None,
    delimiter: Optional[str] = None,
    drop_null_columns: Optional[bool] = None,
    filter: Optional[Dict[str, Any]] = None,
    format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
    include_ccs_metadata: Optional[bool] = None,
    locale: Optional[str] = None,
    params: Optional[List[Union[None, bool, float, int, str]]] = None,
    profile: Optional[bool] = None,
    tables: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    Execute an ES|QL query synchronously.
    
    Parameters:
    - query: The ES|QL query string to execute
    - allow_partial_results: Return partial results on shard failures if True
    - columnar: Return results in columnar format instead of rows
    - delimiter: Character to use between CSV values
    - drop_null_columns: Remove entirely null columns from results
    - filter: Query DSL filter to apply before ES|QL execution
    - format: Response format (json, csv, tsv, txt, arrow, etc.)
    - include_ccs_metadata: Include cross-cluster search metadata
    - locale: Locale for date/time formatting
    - params: Parameter values for query placeholders (?)
    - profile: Include query execution profile information
    - tables: Tables for LOOKUP operations
    
    Returns:
    ObjectApiResponse with query results in specified format
    """

Usage Examples

from elasticsearch import Elasticsearch

client = Elasticsearch(['http://localhost:9200'])

# Basic ES|QL query
response = client.esql.query(
    query="FROM logs-* | WHERE @timestamp > NOW() - 1 hour | STATS count() BY host"
)

# Query with parameters for safety
response = client.esql.query(
    query="FROM employees | WHERE emp_no == ? | KEEP emp_no, first_name, last_name",
    params=[10001]
)

# Get results in CSV format
response = client.esql.query(
    query="FROM products | STATS avg_price = AVG(price) BY category",
    format="csv",
    delimiter=","
)

# Columnar format for better performance with large datasets
response = client.esql.query(
    query="FROM sales | STATS total = SUM(amount) BY region",
    columnar=True
)

# Apply additional Query DSL filter
response = client.esql.query(
    query="FROM logs-* | STATS error_count = COUNT() BY service",
    filter={
        "range": {
            "@timestamp": {
                "gte": "now-1d"
            }
        }
    }
)

# Use LOOKUP with external tables
lookup_table = {
    "departments": {
        "dept_id": {"1": "Engineering", "2": "Sales", "3": "Marketing"},
        "manager": {"1": "Alice", "2": "Bob", "3": "Carol"}
    }
}

response = client.esql.query(
    query="""
    FROM employees 
    | LOOKUP departments ON dept_id 
    | STATS count() BY manager
    """,
    tables=lookup_table
)

Asynchronous Query Management

Execute long-running ES|QL queries asynchronously with full lifecycle management.

def async_query(
    self,
    *,
    query: Optional[str] = None,
    allow_partial_results: Optional[bool] = None,
    columnar: Optional[bool] = None,
    delimiter: Optional[str] = None,
    drop_null_columns: Optional[bool] = None,
    filter: Optional[Dict[str, Any]] = None,
    format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
    include_ccs_metadata: Optional[bool] = None,
    keep_alive: Optional[Union[str, int]] = None,
    keep_on_completion: Optional[bool] = None,
    locale: Optional[str] = None,
    params: Optional[List[Union[None, bool, float, int, str]]] = None,
    profile: Optional[bool] = None,
    tables: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
    wait_for_completion_timeout: Optional[Union[str, int]] = None,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    Submit an ES|QL query for asynchronous execution.
    
    Parameters:
    - query: The ES|QL query string to execute
    - keep_alive: How long to keep the async query available (e.g., "1d", "12h")
    - keep_on_completion: Keep the query available after completion
    - wait_for_completion_timeout: Max time to wait for immediate completion
    - (other parameters same as synchronous query)
    
    Returns:
    ObjectApiResponse with query ID and initial status
    """

def async_query_get(
    self,
    *,
    id: str,
    drop_null_columns: Optional[bool] = None,
    format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    Get results or status of an asynchronous ES|QL query.
    
    Parameters:
    - id: The async query ID returned by async_query
    - drop_null_columns: Remove entirely null columns from results
    - format: Response format for completed queries
    
    Returns:
    ObjectApiResponse with query results or current status
    """

def async_query_delete(
    self,
    *,
    id: str,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    Delete an asynchronous ES|QL query and its results.
    
    Parameters:
    - id: The async query ID to delete
    
    Returns:
    ObjectApiResponse confirming deletion
    """

def async_query_stop(
    self,
    *,
    id: str,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    Stop a running asynchronous ES|QL query.
    
    Parameters:
    - id: The async query ID to stop
    
    Returns:
    ObjectApiResponse confirming the stop request
    """

Async Usage Examples

import time
from elasticsearch import Elasticsearch

client = Elasticsearch(['http://localhost:9200'])

# Submit long-running query asynchronously
response = client.esql.async_query(
    query="""
    FROM large_dataset 
    | STATS 
        avg_value = AVG(value),
        max_value = MAX(value),
        min_value = MIN(value)
    BY category, subcategory
    """,
    keep_alive="1h",
    keep_on_completion=True,
    wait_for_completion_timeout="30s"
)

query_id = response.body['id']
is_running = response.body.get('is_running', False)

# Poll for completion
while is_running:
    time.sleep(5)  # Wait 5 seconds
    status_response = client.esql.async_query_get(id=query_id)
    is_running = status_response.body.get('is_running', False)
    
    if not is_running:
        # Query completed, get results
        results = status_response.body
        print(f"Query completed with {len(results.get('values', []))} rows")
        break

# Clean up - delete the async query
client.esql.async_query_delete(id=query_id)

Query Management

List and monitor all running ES|QL queries in the cluster.

def list_queries(
    self,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    List all running ES|QL queries in the cluster.
    
    Returns:
    ObjectApiResponse with list of running queries and their status
    """

def get_query(
    self,
    *,
    id: str,
    **kwargs
) -> ObjectApiResponse[Any]:
    """
    Get information about a specific ES|QL query.
    
    Parameters:
    - id: The query ID to get information about
    
    Returns:
    ObjectApiResponse with query details and status
    """

Query Management Examples

# List all running queries
queries = client.esql.list_queries()
for query in queries.body.get('queries', []):
    print(f"Query {query['id']}: {query['status']}")

# Get details about a specific query
query_info = client.esql.get_query(id="query_123")
print(f"Query start time: {query_info.body['start_time']}")
print(f"Running time: {query_info.body['running_time_in_nanos']} ns")

ES|QL Query Building (DSL)

Python DSL for building ES|QL queries programmatically with type safety and IDE support.

from elasticsearch.esql import ESQL, and_, or_, not_

class ESQL:
    @staticmethod
    def from_(*indices: Union[str, Type]) -> "From":
        """Create FROM source command for specified indices."""
    
    @staticmethod
    def row(**params: Any) -> "Row":
        """Create ROW source command with specified column values."""
    
    @staticmethod
    def show(item: str) -> "Show":
        """Create SHOW source command (item must be 'INFO')."""
    
    @staticmethod
    def branch() -> "Branch":
        """Create branch for use within FORK operations."""

# Logical operators for complex conditions
def and_(*conditions) -> Any:
    """Combine conditions with AND logic."""

def or_(*conditions) -> Any:
    """Combine conditions with OR logic."""

def not_(condition) -> Any:
    """Negate a condition with NOT logic."""

DSL Usage Examples

from elasticsearch.esql import ESQL, and_, or_, not_

# Build query programmatically
query = (
    ESQL.from_("employees")
    .where(and_(
        "salary > 50000",
        or_("department == 'Engineering'", "department == 'Sales'")
    ))
    .stats(avg_salary="AVG(salary)", count="COUNT()")
    .by("department")
    .sort("avg_salary", "DESC")
    .limit(10)
)

# Execute the built query
response = client.esql.query(query=str(query))

# Row source for testing
test_query = (
    ESQL.row(x=1, y=2, z="test")
    .eval(sum="x + y")
    .keep("sum", "z")
)

# Show cluster information
info_query = ESQL.show("INFO")
cluster_info = client.esql.query(query=str(info_query))

Types

from typing import Any, Dict, List, Optional, Union, Literal

# ES|QL specific types
ESQLQuery = str
ESQLParams = List[Union[None, bool, float, int, str]]
ESQLFormat = Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]
ESQLTables = Dict[str, Dict[str, Dict[str, Any]]]

# Response types
class ESQLResponse:
    columns: List[Dict[str, str]]  # Column metadata
    values: List[List[Any]]        # Row data
    all_columns: Optional[List[str]]  # All column names (when drop_null_columns=True)

class AsyncESQLResponse:
    id: str                        # Async query ID
    is_running: bool              # Query execution status
    is_partial: bool              # Partial results available
    start_time_in_millis: int     # Query start timestamp
    expiration_time_in_millis: int # Query expiration timestamp
    completion_status: Optional[int] # HTTP status when completed

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch

docs

client-operations.md

cluster-management.md

esql-operations.md

exception-handling.md

helper-functions.md

index-management.md

index.md

inference-api.md

lifecycle-management.md

machine-learning.md

query-dsl.md

search-operations.md

security-operations.md

vectorstore-helpers.md

tile.json