Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
The Elasticsearch DSL (Domain Specific Language) provides a Pythonic way to construct search queries, aggregations, and document models. It offers a high-level interface that generates Elasticsearch JSON queries while maintaining the full power and flexibility of the underlying query language.
The main search interface for constructing and executing queries.
class Search:
"""
Main search interface for building and executing queries.
"""
def __init__(
self,
using=None,
index=None,
doc_type=None,
extra=None
): ...
def query(self, q): ...
def filter(self, f): ...
def exclude(self, f): ...
def post_filter(self, f): ...
def aggregate(self, name, agg): ...
def sort(self, *keys): ...
def source(self, fields=None, **kwargs): ...
def highlight(self, *args, **kwargs): ...
def suggest(self, name, text, **kwargs): ...
def script_fields(self, **kwargs): ...
def from_(self, from_): ...
def size(self, size): ...
def params(self, **kwargs): ...
def index(self, *index): ...
def doc_type(self, *doc_type): ...
def using(self, client): ...
def execute(self, ignore_cache=False): ...
def to_dict(self): ...
def count(self): ...
def delete(self): ...
def scan(self): ...
class AsyncSearch:
"""
Async version of Search for use with AsyncElasticsearch.
"""
# Same methods as Search but async
class MultiSearch:
"""
Multi-search interface for executing multiple searches in a single request.
"""
def __init__(self, using=None, index=None): ...
def add(self, search): ...
def execute(self): ...
def to_dict(self): ...
class AsyncMultiSearch:
"""
Async version of MultiSearch.
"""
async def execute(self): ...
class UpdateByQuery:
"""
Update by query interface for bulk updates using scripts.
"""
def __init__(self, using=None, index=None): ...
def query(self, q): ...
def script(self, **kwargs): ...
def execute(self): ...
class AsyncUpdateByQuery:
"""
Async version of UpdateByQuery.
"""
async def execute(self): ...
async def execute(self, ignore_cache=False): ...
async def count(self): ...
async def delete(self): ...
def scan(self): ... # Returns async generatorConstruct various types of Elasticsearch queries.
class Q:
"""
Base query builder class with query type methods.
"""
@classmethod
def match(cls, **kwargs): ...
@classmethod
def match_all(cls, **kwargs): ...
@classmethod
def match_phrase(cls, **kwargs): ...
@classmethod
def match_phrase_prefix(cls, **kwargs): ...
@classmethod
def multi_match(cls, query, fields, **kwargs): ...
@classmethod
def term(cls, **kwargs): ...
@classmethod
def terms(cls, **kwargs): ...
@classmethod
def range(cls, **kwargs): ...
@classmethod
def exists(cls, field): ...
@classmethod
def missing(cls, field): ...
@classmethod
def bool(cls, must=None, should=None, must_not=None, filter=None, **kwargs): ...
@classmethod
def nested(cls, path, query, **kwargs): ...
@classmethod
def has_child(cls, type, query, **kwargs): ...
@classmethod
def has_parent(cls, type, query, **kwargs): ...
@classmethod
def ids(cls, values, **kwargs): ...
@classmethod
def prefix(cls, **kwargs): ...
@classmethod
def wildcard(cls, **kwargs): ...
@classmethod
def regexp(cls, **kwargs): ...
@classmethod
def fuzzy(cls, **kwargs): ...
@classmethod
def query_string(cls, query, **kwargs): ...
@classmethod
def simple_query_string(cls, query, **kwargs): ...
@classmethod
def geo_distance(cls, distance, **kwargs): ...
@classmethod
def geo_bounding_box(cls, **kwargs): ...
@classmethod
def geo_polygon(cls, **kwargs): ...
@classmethod
def geo_shape(cls, **kwargs): ...
@classmethod
def function_score(cls, query=None, functions=None, **kwargs): ...
@classmethod
def script_score(cls, query, script, **kwargs): ...
# Alias for backward compatibility
Query = QConstruct Elasticsearch aggregations for data analysis.
class A:
"""
Base aggregation builder class.
"""
@classmethod
def terms(cls, field, **kwargs): ...
@classmethod
def date_histogram(cls, field, **kwargs): ...
@classmethod
def histogram(cls, field, **kwargs): ...
@classmethod
def range(cls, field, ranges, **kwargs): ...
@classmethod
def date_range(cls, field, ranges, **kwargs): ...
@classmethod
def nested(cls, path, **kwargs): ...
@classmethod
def reverse_nested(cls, **kwargs): ...
@classmethod
def children(cls, type, **kwargs): ...
@classmethod
def filter(cls, filter, **kwargs): ...
@classmethod
def filters(cls, filters, **kwargs): ...
@classmethod
def global_(cls, **kwargs): ...
@classmethod
def missing(cls, field, **kwargs): ...
@classmethod
def significant_terms(cls, field, **kwargs): ...
@classmethod
def cardinality(cls, field, **kwargs): ...
@classmethod
def avg(cls, field, **kwargs): ...
@classmethod
def sum(cls, field, **kwargs): ...
@classmethod
def min(cls, field, **kwargs): ...
@classmethod
def max(cls, field, **kwargs): ...
@classmethod
def stats(cls, field, **kwargs): ...
@classmethod
def extended_stats(cls, field, **kwargs): ...
@classmethod
def percentiles(cls, field, **kwargs): ...
@classmethod
def percentile_ranks(cls, field, values, **kwargs): ...
@classmethod
def top_hits(cls, **kwargs): ...
@classmethod
def geo_bounds(cls, field, **kwargs): ...
@classmethod
def geo_centroid(cls, field, **kwargs): ...
# Alias for backward compatibility
Agg = ADefine document structures and mappings.
class Document:
"""
Base document class for modeling Elasticsearch documents.
"""
class Index:
"""
Configuration for the document's index.
"""
name: Optional[str] = None
settings: Optional[Dict] = None
mappings: Optional[Dict] = None
aliases: Optional[Dict] = None
analyzers: Optional[Dict] = None
class Meta:
"""
Document metadata configuration.
"""
doc_type: Optional[str] = None
using: Optional[str] = None
index: Optional[str] = None
routing: Optional[str] = None
def __init__(self, meta=None, **kwargs): ...
def save(self, **kwargs): ...
def update(self, **kwargs): ...
def delete(self, **kwargs): ...
@classmethod
def get(cls, id, **kwargs): ...
@classmethod
def mget(cls, docs, **kwargs): ...
@classmethod
def search(cls, **kwargs): ...
def to_dict(self, skip_empty=True): ...
@classmethod
def init(cls, index=None, using=None): ...
class AsyncDocument:
"""
Async version of Document class.
"""
# Same interface as Document but with async methods
async def save(self, **kwargs): ...
async def update(self, **kwargs): ...
async def delete(self, **kwargs): ...
@classmethod
async def get(cls, id, **kwargs): ...
@classmethod
async def mget(cls, docs, **kwargs): ...
class InnerDoc:
"""
Base class for nested document objects.
"""
def __init__(self, **kwargs): ...
def to_dict(self, skip_empty=True): ...Define field mappings and types for documents.
# Text and Keyword Fields
class Text:
def __init__(self, analyzer=None, search_analyzer=None, **kwargs): ...
class Keyword:
def __init__(self, ignore_above=None, normalizer=None, **kwargs): ...
class SearchAsYouType:
def __init__(self, max_shingle_size=None, analyzer=None, **kwargs): ...
class Completion:
def __init__(self, analyzer=None, contexts=None, **kwargs): ...
# Numeric Fields
class Integer:
def __init__(self, coerce=None, ignore_malformed=None, **kwargs): ...
class Long:
def __init__(self, coerce=None, ignore_malformed=None, **kwargs): ...
class Float:
def __init__(self, coerce=None, ignore_malformed=None, **kwargs): ...
class Double:
def __init__(self, coerce=None, ignore_malformed=None, **kwargs): ...
class Boolean:
def __init__(self, **kwargs): ...
# Date Fields
class Date:
def __init__(self, format=None, locale=None, **kwargs): ...
class DateRange:
def __init__(self, format=None, **kwargs): ...
# Geographic Fields
class GeoPoint:
def __init__(self, ignore_malformed=None, ignore_z_value=None, **kwargs): ...
class GeoShape:
def __init__(self, tree=None, precision=None, **kwargs): ...
# Object and Nested Fields
class Object:
def __init__(self, properties=None, dynamic=None, **kwargs): ...
class Nested:
def __init__(self, properties=None, dynamic=None, **kwargs): ...
# Specialized Fields
class Binary:
def __init__(self, **kwargs): ...
class Join:
def __init__(self, relations, **kwargs): ...
class Percolator:
def __init__(self, **kwargs): ...
class Ip:
def __init__(self, ignore_malformed=None, **kwargs): ...
class TokenCount:
def __init__(self, analyzer, **kwargs): ...
# Vector Fields
class DenseVector:
def __init__(self, dims, index=None, similarity=None, **kwargs): ...
class SparseVector:
def __init__(self, **kwargs): ...
class RankFeature:
def __init__(self, positive_score_impact=None, **kwargs): ...
class RankFeatures:
def __init__(self, **kwargs): ...Manage indices and their configurations.
class Index:
"""
Index management operations.
"""
def __init__(self, name, using='default'): ...
def analyzer(self, name, **kwargs): ...
def tokenizer(self, name, **kwargs): ...
def token_filter(self, name, **kwargs): ...
def char_filter(self, name, **kwargs): ...
def normalizer(self, name, **kwargs): ...
def create(self, **kwargs): ...
def delete(self, **kwargs): ...
def exists(self): ...
def close(self): ...
def open(self): ...
def clone(self, target, **kwargs): ...
def refresh(self): ...
def flush(self): ...
def force_merge(self, **kwargs): ...
def put_mapping(self, **kwargs): ...
def get_mapping(self): ...
def put_settings(self, **kwargs): ...
def get_settings(self): ...
def put_alias(self, name, **kwargs): ...
def get_alias(self, name=None): ...
def delete_alias(self, name): ...
class AsyncIndex:
"""
Async version of Index management.
"""
# Same interface as Index but with async methods
async def create(self, **kwargs): ...
async def delete(self, **kwargs): ...
async def exists(self): ...from elasticsearch.dsl import Search, Q
# Create a search object
s = Search()
# Add a simple match query
s = s.query('match', title='python')
# Add filters
s = s.filter('term', status='published')
s = s.filter('range', publish_date={'gte': '2015-01-01'})
# Execute the search
response = s.execute()
# Process results
for hit in response:
print(f"Title: {hit.title}")
print(f"Score: {hit.meta.score}")from elasticsearch.dsl import Search, Q
# Construct complex boolean query
q = Q('bool',
must=[
Q('match', title='elasticsearch'),
Q('range', publish_date={'gte': '2020-01-01'})
],
should=[
Q('match', tags='python'),
Q('match', tags='search')
],
must_not=[
Q('term', status='draft')
],
filter=[
Q('term', category='tutorial')
]
)
s = Search().query(q)
response = s.execute()from elasticsearch.dsl import Search, A
s = Search()
# Add aggregations
s.aggs.bucket('categories', 'terms', field='category.keyword', size=10)
s.aggs.bucket('monthly_posts', 'date_histogram',
field='publish_date',
calendar_interval='month')
# Nested aggregation
s.aggs.bucket('categories', 'terms', field='category.keyword')\
.metric('avg_score', 'avg', field='score')
# Execute and process aggregations
response = s.execute()
for bucket in response.aggregations.categories.buckets:
print(f"Category: {bucket.key}, Count: {bucket.doc_count}")
if hasattr(bucket, 'avg_score'):
print(f"Average score: {bucket.avg_score.value}")from elasticsearch.dsl import Document, Text, Keyword, Date, Integer, Nested
class Comment(InnerDoc):
author = Text()
content = Text()
created_at = Date()
class Article(Document):
title = Text(analyzer='standard')
content = Text()
author = Keyword()
publish_date = Date()
tags = Keyword(multi=True)
comments = Nested(Comment)
view_count = Integer()
class Index:
name = 'articles'
settings = {
'number_of_shards': 1,
'number_of_replicas': 0
}
def save(self, **kwargs):
# Custom save logic
self.view_count = 0 # Initialize view count
return super().save(**kwargs)
@classmethod
def get_published(cls):
"""Get only published articles."""
s = cls.search()
s = s.filter('term', status='published')
return s
# Initialize the index
Article.init()
# Create and save a document
article = Article(
title='Getting Started with Elasticsearch',
content='This is a comprehensive guide...',
author='john_doe',
publish_date='2024-01-01',
tags=['elasticsearch', 'python', 'tutorial']
)
article.save()
# Search for articles
articles = Article.search().filter('term', author='john_doe')
for article in articles:
print(f"Title: {article.title}")from elasticsearch.dsl import Search, Q, A
# Search with highlighting
s = Search()
s = s.query('match', content='elasticsearch')
s = s.highlight('content', fragment_size=150, number_of_fragments=3)
# Add suggestions
s = s.suggest('title_suggestion', 'elasicsearch', term={'field': 'title'})
# Add script fields
s = s.script_fields(
popularity_score={
'script': {
'source': 'doc["view_count"].value * doc["like_count"].value'
}
}
)
# Sorting
s = s.sort('-publish_date', {'view_count': {'order': 'desc'}})
# Pagination
s = s[10:20] # Skip 10, take 10
response = s.execute()
# Process highlights
for hit in response:
if hasattr(hit.meta, 'highlight'):
for fragment in hit.meta.highlight.content:
print(f"Highlight: {fragment}")
# Access script fields
if hasattr(hit.meta, 'script_fields'):
print(f"Popularity: {hit.meta.script_fields.popularity_score}")
# Process suggestions
if hasattr(response, 'suggest'):
for suggestion in response.suggest.title_suggestion:
print(f"Original: {suggestion.text}")
for option in suggestion.options:
print(f"Suggestion: {option.text}")from elasticsearch.dsl import MultiSearch, Search
# Create multiple searches
ms = MultiSearch()
# Add individual searches
s1 = Search().query('match', title='python')
s2 = Search().query('match', title='elasticsearch')
s3 = Search().filter('range', publish_date={'gte': '2024-01-01'})
ms = ms.add(s1)
ms = ms.add(s2)
ms = ms.add(s3)
# Execute all searches
responses = ms.execute()
# Process results
for i, response in enumerate(responses):
print(f"Search {i+1}: {response.hits.total.value} hits")
for hit in response:
print(f" - {hit.title}")from elasticsearch.dsl import FacetedSearch, TermsFacet, DateHistogramFacet, RangeFacet
class ArticleSearch(FacetedSearch):
doc_types = [Article]
facets = {
'category': TermsFacet(field='category.keyword'),
'tags': TermsFacet(field='tags.keyword'),
'publish_year': DateHistogramFacet(
field='publish_date',
calendar_interval='year'
),
'view_ranges': RangeFacet(
field='view_count',
ranges=[
('low', (None, 100)),
('medium', (100, 1000)),
('high', (1000, None))
]
)
}
def search(self):
# Base query
s = super().search()
# Add default filters
s = s.filter('term', status='published')
return s
# Use faceted search
search = ArticleSearch('python tutorial', {
'category': ['programming'],
'view_ranges': ['high']
})
response = search.execute()
# Access facets
for facet_name, facet in response.facets.items():
print(f"{facet_name}:")
for bucket in facet:
print(f" {bucket[0]}: {bucket[1]}")Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch