0
# ES|QL Operations
1
2
ES|QL (Elasticsearch Query Language) provides SQL-like syntax for querying and analyzing Elasticsearch data. The ESQL client offers both synchronous query execution and asynchronous query management for long-running operations.
3
4
## Capabilities
5
6
### Synchronous Query Execution
7
8
Execute ES|QL queries synchronously and get immediate results for fast queries.
9
10
```python { .api }
11
def query(
12
self,
13
*,
14
query: Optional[str] = None,
15
allow_partial_results: Optional[bool] = None,
16
columnar: Optional[bool] = None,
17
delimiter: Optional[str] = None,
18
drop_null_columns: Optional[bool] = None,
19
filter: Optional[Dict[str, Any]] = None,
20
format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
21
include_ccs_metadata: Optional[bool] = None,
22
locale: Optional[str] = None,
23
params: Optional[List[Union[None, bool, float, int, str]]] = None,
24
profile: Optional[bool] = None,
25
tables: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
26
**kwargs
27
) -> ObjectApiResponse[Any]:
28
"""
29
Execute an ES|QL query synchronously.
30
31
Parameters:
32
- query: The ES|QL query string to execute
33
- allow_partial_results: Return partial results on shard failures if True
34
- columnar: Return results in columnar format instead of rows
35
- delimiter: Character to use between CSV values
36
- drop_null_columns: Remove entirely null columns from results
37
- filter: Query DSL filter to apply before ES|QL execution
38
- format: Response format (json, csv, tsv, txt, arrow, etc.)
39
- include_ccs_metadata: Include cross-cluster search metadata
40
- locale: Locale for date/time formatting
41
- params: Parameter values for query placeholders (?)
42
- profile: Include query execution profile information
43
- tables: Tables for LOOKUP operations
44
45
Returns:
46
ObjectApiResponse with query results in specified format
47
"""
48
```
49
50
#### Usage Examples
51
52
```python
53
from elasticsearch import Elasticsearch
54
55
client = Elasticsearch(['http://localhost:9200'])
56
57
# Basic ES|QL query
58
response = client.esql.query(
59
query="FROM logs-* | WHERE @timestamp > NOW() - 1 hour | STATS count() BY host"
60
)
61
62
# Query with parameters for safety
63
response = client.esql.query(
64
query="FROM employees | WHERE emp_no == ? | KEEP emp_no, first_name, last_name",
65
params=[10001]
66
)
67
68
# Get results in CSV format
69
response = client.esql.query(
70
query="FROM products | STATS avg_price = AVG(price) BY category",
71
format="csv",
72
delimiter=","
73
)
74
75
# Columnar format for better performance with large datasets
76
response = client.esql.query(
77
query="FROM sales | STATS total = SUM(amount) BY region",
78
columnar=True
79
)
80
81
# Apply additional Query DSL filter
82
response = client.esql.query(
83
query="FROM logs-* | STATS error_count = COUNT() BY service",
84
filter={
85
"range": {
86
"@timestamp": {
87
"gte": "now-1d"
88
}
89
}
90
}
91
)
92
93
# Use LOOKUP with external tables
94
lookup_table = {
95
"departments": {
96
"dept_id": {"1": "Engineering", "2": "Sales", "3": "Marketing"},
97
"manager": {"1": "Alice", "2": "Bob", "3": "Carol"}
98
}
99
}
100
101
response = client.esql.query(
102
query="""
103
FROM employees
104
| LOOKUP departments ON dept_id
105
| STATS count() BY manager
106
""",
107
tables=lookup_table
108
)
109
```
110
111
### Asynchronous Query Management
112
113
Execute long-running ES|QL queries asynchronously with full lifecycle management.
114
115
```python { .api }
116
def async_query(
117
self,
118
*,
119
query: Optional[str] = None,
120
allow_partial_results: Optional[bool] = None,
121
columnar: Optional[bool] = None,
122
delimiter: Optional[str] = None,
123
drop_null_columns: Optional[bool] = None,
124
filter: Optional[Dict[str, Any]] = None,
125
format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
126
include_ccs_metadata: Optional[bool] = None,
127
keep_alive: Optional[Union[str, int]] = None,
128
keep_on_completion: Optional[bool] = None,
129
locale: Optional[str] = None,
130
params: Optional[List[Union[None, bool, float, int, str]]] = None,
131
profile: Optional[bool] = None,
132
tables: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
133
wait_for_completion_timeout: Optional[Union[str, int]] = None,
134
**kwargs
135
) -> ObjectApiResponse[Any]:
136
"""
137
Submit an ES|QL query for asynchronous execution.
138
139
Parameters:
140
- query: The ES|QL query string to execute
141
- keep_alive: How long to keep the async query available (e.g., "1d", "12h")
142
- keep_on_completion: Keep the query available after completion
143
- wait_for_completion_timeout: Max time to wait for immediate completion
144
- (other parameters same as synchronous query)
145
146
Returns:
147
ObjectApiResponse with query ID and initial status
148
"""
149
150
def async_query_get(
151
self,
152
*,
153
id: str,
154
drop_null_columns: Optional[bool] = None,
155
format: Optional[Union[str, Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]]] = None,
156
**kwargs
157
) -> ObjectApiResponse[Any]:
158
"""
159
Get results or status of an asynchronous ES|QL query.
160
161
Parameters:
162
- id: The async query ID returned by async_query
163
- drop_null_columns: Remove entirely null columns from results
164
- format: Response format for completed queries
165
166
Returns:
167
ObjectApiResponse with query results or current status
168
"""
169
170
def async_query_delete(
171
self,
172
*,
173
id: str,
174
**kwargs
175
) -> ObjectApiResponse[Any]:
176
"""
177
Delete an asynchronous ES|QL query and its results.
178
179
Parameters:
180
- id: The async query ID to delete
181
182
Returns:
183
ObjectApiResponse confirming deletion
184
"""
185
186
def async_query_stop(
187
self,
188
*,
189
id: str,
190
**kwargs
191
) -> ObjectApiResponse[Any]:
192
"""
193
Stop a running asynchronous ES|QL query.
194
195
Parameters:
196
- id: The async query ID to stop
197
198
Returns:
199
ObjectApiResponse confirming the stop request
200
"""
201
```
202
203
#### Async Usage Examples
204
205
```python
206
import time
207
from elasticsearch import Elasticsearch
208
209
client = Elasticsearch(['http://localhost:9200'])
210
211
# Submit long-running query asynchronously
212
response = client.esql.async_query(
213
query="""
214
FROM large_dataset
215
| STATS
216
avg_value = AVG(value),
217
max_value = MAX(value),
218
min_value = MIN(value)
219
BY category, subcategory
220
""",
221
keep_alive="1h",
222
keep_on_completion=True,
223
wait_for_completion_timeout="30s"
224
)
225
226
query_id = response.body['id']
227
is_running = response.body.get('is_running', False)
228
229
# Poll for completion
230
while is_running:
231
time.sleep(5) # Wait 5 seconds
232
status_response = client.esql.async_query_get(id=query_id)
233
is_running = status_response.body.get('is_running', False)
234
235
if not is_running:
236
# Query completed, get results
237
results = status_response.body
238
print(f"Query completed with {len(results.get('values', []))} rows")
239
break
240
241
# Clean up - delete the async query
242
client.esql.async_query_delete(id=query_id)
243
```
244
245
### Query Management
246
247
List and monitor all running ES|QL queries in the cluster.
248
249
```python { .api }
250
def list_queries(
251
self,
252
**kwargs
253
) -> ObjectApiResponse[Any]:
254
"""
255
List all running ES|QL queries in the cluster.
256
257
Returns:
258
ObjectApiResponse with list of running queries and their status
259
"""
260
261
def get_query(
262
self,
263
*,
264
id: str,
265
**kwargs
266
) -> ObjectApiResponse[Any]:
267
"""
268
Get information about a specific ES|QL query.
269
270
Parameters:
271
- id: The query ID to get information about
272
273
Returns:
274
ObjectApiResponse with query details and status
275
"""
276
```
277
278
#### Query Management Examples
279
280
```python
281
# List all running queries
282
queries = client.esql.list_queries()
283
for query in queries.body.get('queries', []):
284
print(f"Query {query['id']}: {query['status']}")
285
286
# Get details about a specific query
287
query_info = client.esql.get_query(id="query_123")
288
print(f"Query start time: {query_info.body['start_time']}")
289
print(f"Running time: {query_info.body['running_time_in_nanos']} ns")
290
```
291
292
## ES|QL Query Building (DSL)
293
294
Python DSL for building ES|QL queries programmatically with type safety and IDE support.
295
296
```python { .api }
297
from elasticsearch.esql import ESQL, and_, or_, not_
298
299
class ESQL:
300
@staticmethod
301
def from_(*indices: Union[str, Type]) -> "From":
302
"""Create FROM source command for specified indices."""
303
304
@staticmethod
305
def row(**params: Any) -> "Row":
306
"""Create ROW source command with specified column values."""
307
308
@staticmethod
309
def show(item: str) -> "Show":
310
"""Create SHOW source command (item must be 'INFO')."""
311
312
@staticmethod
313
def branch() -> "Branch":
314
"""Create branch for use within FORK operations."""
315
316
# Logical operators for complex conditions
317
def and_(*conditions) -> Any:
318
"""Combine conditions with AND logic."""
319
320
def or_(*conditions) -> Any:
321
"""Combine conditions with OR logic."""
322
323
def not_(condition) -> Any:
324
"""Negate a condition with NOT logic."""
325
```
326
327
### DSL Usage Examples
328
329
```python
330
from elasticsearch.esql import ESQL, and_, or_, not_
331
332
# Build query programmatically
333
query = (
334
ESQL.from_("employees")
335
.where(and_(
336
"salary > 50000",
337
or_("department == 'Engineering'", "department == 'Sales'")
338
))
339
.stats(avg_salary="AVG(salary)", count="COUNT()")
340
.by("department")
341
.sort("avg_salary", "DESC")
342
.limit(10)
343
)
344
345
# Execute the built query
346
response = client.esql.query(query=str(query))
347
348
# Row source for testing
349
test_query = (
350
ESQL.row(x=1, y=2, z="test")
351
.eval(sum="x + y")
352
.keep("sum", "z")
353
)
354
355
# Show cluster information
356
info_query = ESQL.show("INFO")
357
cluster_info = client.esql.query(query=str(info_query))
358
```
359
360
## Types
361
362
```python { .api }
363
from typing import Any, Dict, List, Optional, Union, Literal
364
365
# ES|QL specific types
366
ESQLQuery = str
367
ESQLParams = List[Union[None, bool, float, int, str]]
368
ESQLFormat = Literal["arrow", "cbor", "csv", "json", "smile", "tsv", "txt", "yaml"]
369
ESQLTables = Dict[str, Dict[str, Dict[str, Any]]]
370
371
# Response types
372
class ESQLResponse:
373
columns: List[Dict[str, str]] # Column metadata
374
values: List[List[Any]] # Row data
375
all_columns: Optional[List[str]] # All column names (when drop_null_columns=True)
376
377
class AsyncESQLResponse:
378
id: str # Async query ID
379
is_running: bool # Query execution status
380
is_partial: bool # Partial results available
381
start_time_in_millis: int # Query start timestamp
382
expiration_time_in_millis: int # Query expiration timestamp
383
completion_status: Optional[int] # HTTP status when completed
384
```