Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
Advanced search capabilities including complex queries, aggregations, scrolling, and search templates. These operations provide comprehensive search functionality for retrieving and analyzing data from Elasticsearch.
Execute multiple search queries in a single request for improved performance.
def msearch(
self,
searches: List[Dict[str, Any]],
index: Optional[str] = None,
routing: Optional[str] = None,
max_concurrent_searches: Optional[int] = None,
rest_total_hits_as_int: Optional[bool] = None,
typed_keys: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Execute multiple search queries.
Parameters:
- searches: List of search query dictionaries
- index: Default index for searches without explicit index
- routing: Default routing value
- max_concurrent_searches: Maximum number of concurrent searches
- rest_total_hits_as_int: Whether to return total hits as integer
- typed_keys: Whether to prefix aggregation names with type
Returns:
ObjectApiResponse with array of search results
"""Efficiently retrieve large result sets using scroll context.
def scroll(
self,
scroll_id: str,
scroll: str = "5m",
rest_total_hits_as_int: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Retrieve next batch of results from scroll context.
Parameters:
- scroll_id: Scroll context ID from previous search
- scroll: Time to keep scroll context alive
- rest_total_hits_as_int: Whether total hits should be integer
Returns:
ObjectApiResponse with next batch of results
"""
def clear_scroll(
self,
scroll_id: Optional[Union[str, List[str]]] = None,
**kwargs
) -> ObjectApiResponse:
"""
Clear scroll contexts to free resources.
Parameters:
- scroll_id: Scroll ID(s) to clear, or None to clear all
Returns:
ObjectApiResponse with clearing results
"""Create stable snapshots for consistent pagination across result sets.
def open_point_in_time(
self,
index: Union[str, List[str]],
keep_alive: str,
routing: Optional[str] = None,
preference: Optional[str] = None,
ignore_unavailable: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Open a point in time for consistent search pagination.
Parameters:
- index: Index name(s) to create point in time for
- keep_alive: Time to keep point in time alive
- routing: Custom routing value
- preference: Node preference
- ignore_unavailable: Whether to ignore unavailable indices
Returns:
ObjectApiResponse with point in time ID
"""
def close_point_in_time(
self,
id: str,
**kwargs
) -> ObjectApiResponse:
"""
Close a point in time to free resources.
Parameters:
- id: Point in time ID to close
Returns:
ObjectApiResponse with closing result
"""Use pre-defined search templates with parameterization.
def search_template(
self,
index: Optional[Union[str, List[str]]] = None,
id: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
source: Optional[str] = None,
explain: Optional[bool] = None,
profile: Optional[bool] = None,
routing: Optional[str] = None,
scroll: Optional[str] = None,
preference: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Execute a search using a template.
Parameters:
- index: Index name(s) to search
- id: Stored template ID
- params: Template parameters
- source: Inline template source
- explain: Whether to include explanation
- profile: Whether to include profiling
- routing: Custom routing value
- scroll: Scroll time for large result sets
- preference: Node preference
Returns:
ObjectApiResponse with search results
"""
def msearch_template(
self,
search_templates: List[Dict[str, Any]],
index: Optional[str] = None,
routing: Optional[str] = None,
max_concurrent_searches: Optional[int] = None,
rest_total_hits_as_int: Optional[bool] = None,
typed_keys: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Execute multiple templated searches.
Parameters:
- search_templates: List of template search specifications
- index: Default index name
- routing: Default routing value
- max_concurrent_searches: Maximum concurrent searches
- rest_total_hits_as_int: Whether total hits should be integer
- typed_keys: Whether to prefix aggregation names
Returns:
ObjectApiResponse with array of template search results
"""
def render_search_template(
self,
id: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
source: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Render a search template without executing.
Parameters:
- id: Stored template ID
- params: Template parameters
- source: Inline template source
Returns:
ObjectApiResponse with rendered template
"""Count documents matching queries without retrieving them.
def count(
self,
index: Optional[Union[str, List[str]]] = None,
query: Optional[Dict[str, Any]] = None,
routing: Optional[str] = None,
preference: Optional[str] = None,
analyzer: Optional[str] = None,
analyze_wildcard: Optional[bool] = None,
default_operator: Optional[str] = None,
df: Optional[str] = None,
lenient: Optional[bool] = None,
terminate_after: Optional[int] = None,
**kwargs
) -> ObjectApiResponse:
"""
Count documents matching a query.
Parameters:
- index: Index name(s) to count in
- query: Query to match documents
- routing: Custom routing value
- preference: Node preference
- analyzer: Analyzer for query string
- analyze_wildcard: Whether to analyze wildcards
- default_operator: Default boolean operator (AND/OR)
- df: Default field for query string
- lenient: Whether to ignore format errors
- terminate_after: Stop counting after this many docs
Returns:
ObjectApiResponse with count result
"""Discover field mappings and capabilities across indices.
def field_caps(
self,
fields: Union[str, List[str]],
index: Optional[Union[str, List[str]]] = None,
ignore_unavailable: Optional[bool] = None,
allow_no_indices: Optional[bool] = None,
expand_wildcards: Optional[str] = None,
include_unmapped: Optional[bool] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get field mapping capabilities across indices.
Parameters:
- fields: Field name(s) to analyze
- index: Index name(s) to analyze
- ignore_unavailable: Whether to ignore unavailable indices
- allow_no_indices: Whether empty index list is acceptable
- expand_wildcards: How to expand wildcard patterns
- include_unmapped: Whether to include unmapped fields
Returns:
ObjectApiResponse with field capabilities
"""Get detailed explanation of query scoring and matching.
def explain(
self,
index: str,
id: str,
query: Optional[Dict[str, Any]] = None,
routing: Optional[str] = None,
preference: Optional[str] = None,
analyzer: Optional[str] = None,
analyze_wildcard: Optional[bool] = None,
default_operator: Optional[str] = None,
df: Optional[str] = None,
lenient: Optional[bool] = None,
_source: Optional[Union[bool, List[str]]] = None,
_source_excludes: Optional[List[str]] = None,
_source_includes: Optional[List[str]] = None,
stored_fields: Optional[List[str]] = None,
**kwargs
) -> ObjectApiResponse:
"""
Explain why a document matches or doesn't match a query.
Parameters:
- index: Index name
- id: Document ID to explain
- query: Query to explain against
- routing: Custom routing value
- preference: Node preference
- analyzer: Analyzer for query string
- analyze_wildcard: Whether to analyze wildcards
- default_operator: Default boolean operator
- df: Default field for query string
- lenient: Whether to ignore format errors
- _source: Source field configuration
- _source_excludes: Source fields to exclude
- _source_includes: Source fields to include
- stored_fields: Stored fields to retrieve
Returns:
ObjectApiResponse with explanation details
"""Get information about which shards a search will execute on.
def search_shards(
self,
index: Optional[Union[str, List[str]]] = None,
routing: Optional[str] = None,
preference: Optional[str] = None,
local: Optional[bool] = None,
ignore_unavailable: Optional[bool] = None,
allow_no_indices: Optional[bool] = None,
expand_wildcards: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Get search shard information.
Parameters:
- index: Index name(s) to get shard info for
- routing: Custom routing value
- preference: Node preference
- local: Whether to return local information only
- ignore_unavailable: Whether to ignore unavailable indices
- allow_no_indices: Whether empty index list is acceptable
- expand_wildcards: How to expand wildcard patterns
Returns:
ObjectApiResponse with shard routing information
"""Evaluate search quality using relevance metrics.
def rank_eval(
self,
requests: List[Dict[str, Any]],
metric: Dict[str, Any],
index: Optional[Union[str, List[str]]] = None,
ignore_unavailable: Optional[bool] = None,
allow_no_indices: Optional[bool] = None,
expand_wildcards: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Evaluate search result quality.
Parameters:
- requests: List of search requests with relevance judgments
- metric: Evaluation metric configuration
- index: Index name(s) to evaluate against
- ignore_unavailable: Whether to ignore unavailable indices
- allow_no_indices: Whether empty index list is acceptable
- expand_wildcards: How to expand wildcard patterns
Returns:
ObjectApiResponse with evaluation results
"""Enumerate terms from field indices for autocomplete and suggestion features.
def terms_enum(
self,
index: str,
field: str,
size: Optional[int] = None,
timeout: Optional[str] = None,
case_insensitive: Optional[bool] = None,
index_filter: Optional[Dict[str, Any]] = None,
string: Optional[str] = None,
search_after: Optional[str] = None,
**kwargs
) -> ObjectApiResponse:
"""
Enumerate terms from a field.
Parameters:
- index: Index name
- field: Field name to get terms from
- size: Maximum number of terms to return
- timeout: Request timeout
- case_insensitive: Whether matching is case insensitive
- index_filter: Filter to apply to documents
- string: String prefix to match terms
- search_after: Term to search after for pagination
Returns:
ObjectApiResponse with enumerated terms
"""from elasticsearch import Elasticsearch
client = Elasticsearch(hosts=['http://localhost:9200'])
# Complex search with query, aggregations, and sorting
response = client.search(
index="products",
query={
"bool": {
"must": [
{"match": {"description": "laptop"}},
{"range": {"price": {"gte": 500, "lte": 2000}}}
],
"filter": [
{"term": {"status": "active"}}
]
}
},
aggs={
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 1000},
{"from": 1000, "to": 1500},
{"from": 1500}
]
}
},
"brands": {
"terms": {
"field": "brand.keyword",
"size": 10
}
}
},
sort=[
{"price": "asc"},
"_score"
],
size=20,
from_=0
)
# Process results
for hit in response.body['hits']['hits']:
print(f"Product: {hit['_source']['name']}, Price: {hit['_source']['price']}")
# Process aggregations
for bucket in response.body['aggregations']['brands']['buckets']:
print(f"Brand: {bucket['key']}, Count: {bucket['doc_count']}")# Initial search with scroll
response = client.search(
index="logs",
query={"match_all": {}},
scroll='5m',
size=1000
)
scroll_id = response.body['_scroll_id']
hits = response.body['hits']['hits']
# Process initial batch
for hit in hits:
process_log_entry(hit['_source'])
# Continue scrolling
while hits:
response = client.scroll(
scroll_id=scroll_id,
scroll='5m'
)
scroll_id = response.body['_scroll_id']
hits = response.body['hits']['hits']
for hit in hits:
process_log_entry(hit['_source'])
# Clean up scroll context
client.clear_scroll(scroll_id=scroll_id)# Store a search template
client.put_script(
id="product_search_template",
script={
"lang": "mustache",
"source": {
"query": {
"bool": {
"must": [
{"match": {"description": "{{search_term}}"}}
],
"filter": [
{"range": {"price": {"gte": "{{min_price}}", "lte": "{{max_price}}"}}}
]
}
},
"size": "{{size}}",
"sort": [{"{{sort_field}}": "{{sort_order}}"}]
}
}
)
# Use the template
response = client.search_template(
index="products",
id="product_search_template",
params={
"search_term": "laptop",
"min_price": 500,
"max_price": 2000,
"size": 10,
"sort_field": "price",
"sort_order": "asc"
}
)Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch