Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
ES|QL (Elasticsearch Query Language) provides SQL-like syntax for querying and analyzing Elasticsearch data. The ESQL client offers both synchronous query execution and asynchronous query management for long-running operations.
Execute ES|QL queries synchronously and get immediate results for fast queries.
def query(
self,
*,
query: Optional[str] = None,
allow_partial_results: Optional[bool] = None,
columnar: Optional[bool] = None,
delimiter: Optional[str] = None,
drop_null_columns: Optional[bool] = None,
filter: Optional[Dict[str, Any]] = None,
format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
include_ccs_metadata: Optional[bool] = None,
locale: Optional[str] = None,
params: Optional[List[Union[None, bool, float, int, str]]] = None,
profile: Optional[bool] = None,
tables: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Execute an ES|QL query synchronously.
Parameters:
- query: The ES|QL query string to execute
- allow_partial_results: Return partial results on shard failures if True
- columnar: Return results in columnar format instead of rows
- delimiter: Character to use between CSV values
- drop_null_columns: Remove entirely null columns from results
- filter: Query DSL filter to apply before ES|QL execution
- format: Response format (json, csv, tsv, txt, arrow, etc.)
- include_ccs_metadata: Include cross-cluster search metadata
- locale: Locale for date/time formatting
- params: Parameter values for query placeholders (?)
- profile: Include query execution profile information
- tables: Tables for LOOKUP operations
Returns:
ObjectApiResponse with query results in specified format
"""from elasticsearch import Elasticsearch
client = Elasticsearch(['http://localhost:9200'])
# Basic ES|QL query
response = client.esql.query(
query="FROM logs-* | WHERE @timestamp > NOW() - 1 hour | STATS count() BY host"
)
# Query with parameters for safety
response = client.esql.query(
query="FROM employees | WHERE emp_no == ? | KEEP emp_no, first_name, last_name",
params=[10001]
)
# Get results in CSV format
response = client.esql.query(
query="FROM products | STATS avg_price = AVG(price) BY category",
format="csv",
delimiter=","
)
# Columnar format for better performance with large datasets
response = client.esql.query(
query="FROM sales | STATS total = SUM(amount) BY region",
columnar=True
)
# Apply additional Query DSL filter
response = client.esql.query(
query="FROM logs-* | STATS error_count = COUNT() BY service",
filter={
"range": {
"@timestamp": {
"gte": "now-1d"
}
}
}
)
# Use LOOKUP with external tables
lookup_table = {
"departments": {
"dept_id": {"1": "Engineering", "2": "Sales", "3": "Marketing"},
"manager": {"1": "Alice", "2": "Bob", "3": "Carol"}
}
}
response = client.esql.query(
query="""
FROM employees
| LOOKUP departments ON dept_id
| STATS count() BY manager
""",
tables=lookup_table
)Execute long-running ES|QL queries asynchronously with full lifecycle management.
def async_query(
self,
*,
query: Optional[str] = None,
allow_partial_results: Optional[bool] = None,
columnar: Optional[bool] = None,
delimiter: Optional[str] = None,
drop_null_columns: Optional[bool] = None,
filter: Optional[Dict[str, Any]] = None,
format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
include_ccs_metadata: Optional[bool] = None,
keep_alive: Optional[Union[str, int]] = None,
keep_on_completion: Optional[bool] = None,
locale: Optional[str] = None,
params: Optional[List[Union[None, bool, float, int, str]]] = None,
profile: Optional[bool] = None,
tables: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
wait_for_completion_timeout: Optional[Union[str, int]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Submit an ES|QL query for asynchronous execution.
Parameters:
- query: The ES|QL query string to execute
- keep_alive: How long to keep the async query available (e.g., "1d", "12h")
- keep_on_completion: Keep the query available after completion
- wait_for_completion_timeout: Max time to wait for immediate completion
- (other parameters same as synchronous query)
Returns:
ObjectApiResponse with query ID and initial status
"""
def async_query_get(
self,
*,
id: str,
drop_null_columns: Optional[bool] = None,
format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Get results or status of an asynchronous ES|QL query.
Parameters:
- id: The async query ID returned by async_query
- drop_null_columns: Remove entirely null columns from results
- format: Response format for completed queries
Returns:
ObjectApiResponse with query results or current status
"""
def async_query_delete(
self,
*,
id: str,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Delete an asynchronous ES|QL query and its results.
Parameters:
- id: The async query ID to delete
Returns:
ObjectApiResponse confirming deletion
"""
def async_query_stop(
self,
*,
id: str,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Stop a running asynchronous ES|QL query.
Parameters:
- id: The async query ID to stop
Returns:
ObjectApiResponse confirming the stop request
"""import time
from elasticsearch import Elasticsearch
client = Elasticsearch(['http://localhost:9200'])
# Submit long-running query asynchronously
response = client.esql.async_query(
query="""
FROM large_dataset
| STATS
avg_value = AVG(value),
max_value = MAX(value),
min_value = MIN(value)
BY category, subcategory
""",
keep_alive="1h",
keep_on_completion=True,
wait_for_completion_timeout="30s"
)
query_id = response.body['id']
is_running = response.body.get('is_running', False)
# Poll for completion
while is_running:
time.sleep(5) # Wait 5 seconds
status_response = client.esql.async_query_get(id=query_id)
is_running = status_response.body.get('is_running', False)
if not is_running:
# Query completed, get results
results = status_response.body
print(f"Query completed with {len(results.get('values', []))} rows")
break
# Clean up - delete the async query
client.esql.async_query_delete(id=query_id)List and monitor all running ES|QL queries in the cluster.
def list_queries(
self,
**kwargs
) -> ObjectApiResponse[Any]:
"""
List all running ES|QL queries in the cluster.
Returns:
ObjectApiResponse with list of running queries and their status
"""
def get_query(
self,
*,
id: str,
**kwargs
) -> ObjectApiResponse[Any]:
"""
Get information about a specific ES|QL query.
Parameters:
- id: The query ID to get information about
Returns:
ObjectApiResponse with query details and status
"""# List all running queries
queries = client.esql.list_queries()
for query in queries.body.get('queries', []):
print(f"Query {query['id']}: {query['status']}")
# Get details about a specific query
query_info = client.esql.get_query(id="query_123")
print(f"Query start time: {query_info.body['start_time']}")
print(f"Running time: {query_info.body['running_time_in_nanos']} ns")Python DSL for building ES|QL queries programmatically with type safety and IDE support.
from elasticsearch.esql import ESQL, and_, or_, not_
class ESQL:
@staticmethod
def from_(*indices: Union[str, Type]) -> "From":
"""Create FROM source command for specified indices."""
@staticmethod
def row(**params: Any) -> "Row":
"""Create ROW source command with specified column values."""
@staticmethod
def show(item: str) -> "Show":
"""Create SHOW source command (item must be 'INFO')."""
@staticmethod
def branch() -> "Branch":
"""Create branch for use within FORK operations."""
# Logical operators for complex conditions
def and_(*conditions) -> Any:
"""Combine conditions with AND logic."""
def or_(*conditions) -> Any:
"""Combine conditions with OR logic."""
def not_(condition) -> Any:
"""Negate a condition with NOT logic."""from elasticsearch.esql import ESQL, and_, or_, not_
# Build query programmatically
query = (
ESQL.from_("employees")
.where(and_(
"salary > 50000",
or_("department == 'Engineering'", "department == 'Sales'")
))
.stats(avg_salary="AVG(salary)", count="COUNT()")
.by("department")
.sort("avg_salary", "DESC")
.limit(10)
)
# Execute the built query
response = client.esql.query(query=str(query))
# Row source for testing
test_query = (
ESQL.row(x=1, y=2, z="test")
.eval(sum="x + y")
.keep("sum", "z")
)
# Show cluster information
info_query = ESQL.show("INFO")
cluster_info = client.esql.query(query=str(info_query))from typing import Any, Dict, List, Optional, Union, Literal
# ES|QL specific types
ESQLQuery = str
ESQLParams = List[Union[None, bool, float, int, str]]
ESQLFormat = Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]
ESQLTables = Dict[str, Dict[str, Dict[str, Any]]]
# Response types
class ESQLResponse:
columns: List[Dict[str, str]] # Column metadata
values: List[List[Any]] # Row data
all_columns: Optional[List[str]] # All column names (when drop_null_columns=True)
class AsyncESQLResponse:
id: str # Async query ID
is_running: bool # Query execution status
is_partial: bool # Partial results available
start_time_in_millis: int # Query start timestamp
expiration_time_in_millis: int # Query expiration timestamp
completion_status: Optional[int] # HTTP status when completedInstall with Tessl CLI
npx tessl i tessl/pypi-elasticsearch