High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.
—
Object-relational mapping for Elasticsearch documents providing automatic index management, CRUD operations, bulk processing, and lifecycle hooks. The Document class bridges Python objects and Elasticsearch documents while maintaining type safety and providing convenient persistence methods.
Base class for creating Elasticsearch documents with field definitions, index configuration, and persistence methods.
class Document:
"""
Base class for Elasticsearch documents.
Attributes are automatically converted to appropriate Field instances
based on their type annotations or assignments.
"""
def __init__(self, meta=None, **kwargs):
"""
Initialize document instance.
Args:
meta (dict, optional): Document metadata (id, index, etc.)
**kwargs: Field values for the document
"""
def save(self, using=None, index=None, validate=True, skip_empty=True, **kwargs):
"""
Save document to Elasticsearch.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
validate (bool): Whether to validate before saving
skip_empty (bool): Skip empty fields
**kwargs: Additional Elasticsearch index parameters
Returns:
bool: True if document was created, False if updated
"""
def delete(self, using=None, index=None, **kwargs):
"""
Delete document from Elasticsearch.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
**kwargs: Additional Elasticsearch delete parameters
Returns:
bool: True if document was deleted
"""
def update(self, using=None, index=None, detect_noop=True, **kwargs):
"""
Update document in Elasticsearch.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
detect_noop (bool): Detect if update is a no-op
**kwargs: Additional update parameters
Returns:
dict: Update response from Elasticsearch
"""
@classmethod
def get(cls, id, using=None, index=None, **kwargs):
"""
Retrieve document by ID.
Args:
id: Document ID
using (str, optional): Connection alias to use
index (str, optional): Index name override
**kwargs: Additional get parameters
Returns:
Document: Document instance
Raises:
NotFoundError: If document doesn't exist
"""
@classmethod
def mget(cls, docs, using=None, index=None, raise_on_error=True, **kwargs):
"""
Multi-get documents by IDs.
Args:
docs (list): List of document IDs or dicts with ID and other params
using (str, optional): Connection alias to use
index (str, optional): Index name override
raise_on_error (bool): Raise exception on missing documents
**kwargs: Additional mget parameters
Returns:
list: List of Document instances (None for missing docs if not raising)
"""
@classmethod
def search(cls, using=None, index=None):
"""
Create Search instance for this document type.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
Returns:
Search: Search instance configured for this document type
"""
@classmethod
def init(cls, index=None, using=None, **kwargs):
"""
Create index and put mapping for this document.
Args:
index (str, optional): Index name override
using (str, optional): Connection alias to use
**kwargs: Additional index creation parameters
"""
def to_dict(self, include_meta=False, skip_empty=True):
"""
Convert document to dictionary.
Args:
include_meta (bool): Include document metadata
skip_empty (bool): Skip empty fields
Returns:
dict: Document as dictionary
"""
@classmethod
def from_dict(cls, d):
"""
Create document instance from dictionary.
Args:
d (dict): Dictionary with document data
Returns:
Document: Document instance
"""Asynchronous version of Document class for async/await operations.
class AsyncDocument:
"""
Async version of Document class for async/await operations.
"""
async def save(self, using=None, index=None, validate=True, skip_empty=True, **kwargs):
"""
Async save document to Elasticsearch.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
validate (bool): Whether to validate before saving
skip_empty (bool): Skip empty fields
**kwargs: Additional Elasticsearch index parameters
Returns:
bool: True if document was created, False if updated
"""
async def delete(self, using=None, index=None, **kwargs):
"""
Async delete document from Elasticsearch.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
**kwargs: Additional Elasticsearch delete parameters
Returns:
bool: True if document was deleted
"""
async def update(self, using=None, index=None, detect_noop=True, **kwargs):
"""
Async update document in Elasticsearch.
Args:
using (str, optional): Connection alias to use
index (str, optional): Index name override
detect_noop (bool): Detect if update is a no-op
**kwargs: Additional update parameters
Returns:
dict: Update response from Elasticsearch
"""
@classmethod
async def get(cls, id, using=None, index=None, **kwargs):
"""
Async retrieve document by ID.
Args:
id: Document ID
using (str, optional): Connection alias to use
index (str, optional): Index name override
**kwargs: Additional get parameters
Returns:
AsyncDocument: Document instance
Raises:
NotFoundError: If document doesn't exist
"""
@classmethod
async def mget(cls, docs, using=None, index=None, raise_on_error=True, **kwargs):
"""
Async multi-get documents by IDs.
Args:
docs (list): List of document IDs or dicts with ID and other params
using (str, optional): Connection alias to use
index (str, optional): Index name override
raise_on_error (bool): Raise exception on missing documents
**kwargs: Additional mget parameters
Returns:
list: List of AsyncDocument instances
"""
@classmethod
async def init(cls, index=None, using=None, **kwargs):
"""
Async create index and put mapping for this document.
Args:
index (str, optional): Index name override
using (str, optional): Connection alias to use
**kwargs: Additional index creation parameters
"""For nested document definitions within other documents.
class InnerDoc:
"""
Base class for nested document definitions.
Used to define object and nested field structures within documents.
"""
def __init__(self, **kwargs):
"""
Initialize inner document.
Args:
**kwargs: Field values for the inner document
"""
def to_dict(self, skip_empty=True):
"""
Convert inner document to dictionary.
Args:
skip_empty (bool): Skip empty fields
Returns:
dict: Inner document as dictionary
"""Configure index settings and mappings within Document classes.
class Index:
"""
Index configuration class used within Document definitions.
Example:
class MyDoc(Document):
title = Text()
class Index:
name = 'my_index'
settings = {
'number_of_shards': 2,
'number_of_replicas': 1
}
"""
name: str # Index name
settings: dict # Index settings
aliases: dict # Index aliasesDocument metadata handling for ID, index, routing, and other Elasticsearch document properties.
class Meta:
"""
Document metadata container.
Accessible via document.meta property.
"""
id: str # Document ID
index: str # Document index
doc_type: str # Document type (deprecated in ES 7+)
routing: str # Document routing
parent: str # Parent document ID (for parent-child)
version: int # Document version
seq_no: int # Sequence number
primary_term: int # Primary term
score: float # Search score (when from search results)Update multiple documents matching a query.
class UpdateByQuery:
"""
Update documents matching a query.
"""
def __init__(self, using=None, index=None):
"""
Initialize update by query operation.
Args:
using (str, optional): Connection alias to use
index (str or list, optional): Index name(s) to update
"""
def script(self, **kwargs):
"""
Set update script.
Args:
**kwargs: Script parameters
Returns:
UpdateByQuery: Current instance with script applied
"""
def query(self, query, **kwargs):
"""
Set query to match documents for update.
Args:
query (str or Query): Query to match documents
**kwargs: Query parameters if query is a string
Returns:
UpdateByQuery: Current instance with query applied
"""
def filter(self, query, **kwargs):
"""
Add filter to update by query.
Args:
query (str or Query): Filter query
**kwargs: Filter parameters if query is a string
Returns:
UpdateByQuery: Current instance with filter applied
"""
def execute(self):
"""
Execute update by query operation.
Returns:
dict: Update by query response with statistics
"""
def params(self, **kwargs):
"""
Set update by query parameters.
Args:
**kwargs: Update parameters
Parameters:
conflicts (str): How to handle conflicts ('abort' or 'proceed')
refresh (bool or str): Refresh policy
timeout (str): Operation timeout
wait_for_active_shards (str): Wait for active shards
wait_for_completion (bool): Wait for completion
requests_per_second (int): Throttling rate
scroll_size (int): Scroll batch size
pipeline (str): Ingest pipeline to use
Returns:
UpdateByQuery: Current instance with parameters applied
"""
class AsyncUpdateByQuery:
"""
Async version of UpdateByQuery for async/await operations.
"""
def __init__(self, using=None, index=None):
"""Initialize async update by query operation."""
def script(self, **kwargs):
"""Set update script (same as UpdateByQuery)."""
def query(self, query, **kwargs):
"""Set query to match documents (same as UpdateByQuery)."""
def filter(self, query, **kwargs):
"""Add filter (same as UpdateByQuery)."""
def params(self, **kwargs):
"""Set parameters (same as UpdateByQuery)."""
async def execute(self):
"""
Async execute update by query operation.
Returns:
dict: Update by query response with statistics
"""Delete multiple documents matching a query.
class DeleteByQuery:
"""
Delete documents matching a query.
"""
def __init__(self, using=None, index=None):
"""
Initialize delete by query operation.
Args:
using (str, optional): Connection alias to use
index (str or list, optional): Index name(s) to delete from
"""
def query(self, query, **kwargs):
"""
Set query to match documents for deletion.
Args:
query (str or Query): Query to match documents
**kwargs: Query parameters if query is a string
Returns:
DeleteByQuery: Current instance with query applied
"""
def filter(self, query, **kwargs):
"""
Add filter to delete by query.
Args:
query (str or Query): Filter query
**kwargs: Filter parameters if query is a string
Returns:
DeleteByQuery: Current instance with filter applied
"""
def execute(self):
"""
Execute delete by query operation.
Returns:
dict: Delete by query response with statistics
"""
def params(self, **kwargs):
"""
Set delete by query parameters.
Args:
**kwargs: Delete parameters
Parameters:
conflicts (str): How to handle conflicts ('abort' or 'proceed')
refresh (bool or str): Refresh policy
timeout (str): Operation timeout
wait_for_active_shards (str): Wait for active shards
wait_for_completion (bool): Wait for completion
requests_per_second (int): Throttling rate
scroll_size (int): Scroll batch size
Returns:
DeleteByQuery: Current instance with parameters applied
"""
class AsyncDeleteByQuery:
"""
Async version of DeleteByQuery for async/await operations.
"""
def __init__(self, using=None, index=None):
"""Initialize async delete by query operation."""
def query(self, query, **kwargs):
"""Set query to match documents (same as DeleteByQuery)."""
def filter(self, query, **kwargs):
"""Add filter (same as DeleteByQuery)."""
def params(self, **kwargs):
"""Set parameters (same as DeleteByQuery)."""
async def execute(self):
"""
Async execute delete by query operation.
Returns:
dict: Delete by query response with statistics
"""Reindex documents from source to destination index.
class Reindex:
"""
Reindex documents from source to destination.
"""
def __init__(self, using=None):
"""
Initialize reindex operation.
Args:
using (str, optional): Connection alias to use
"""
def source(self, **kwargs):
"""
Configure source for reindex operation.
Args:
**kwargs: Source configuration
Parameters:
index (str or list): Source index name(s)
query (dict): Query to filter source documents
sort (list): Sort order for source documents
_source (list or dict): Source field filtering
size (int): Batch size for reindexing
Returns:
Reindex: Current instance with source configured
"""
def dest(self, **kwargs):
"""
Configure destination for reindex operation.
Args:
**kwargs: Destination configuration
Parameters:
index (str): Destination index name
type (str): Destination document type (deprecated)
routing (str): Routing for destination documents
op_type (str): Operation type ('index' or 'create')
version_type (str): Version type for conflicts
pipeline (str): Ingest pipeline to use
Returns:
Reindex: Current instance with destination configured
"""
def script(self, **kwargs):
"""
Set reindex script for document transformation.
Args:
**kwargs: Script configuration
Returns:
Reindex: Current instance with script applied
"""
def execute(self):
"""
Execute reindex operation.
Returns:
dict: Reindex response with statistics
"""
def params(self, **kwargs):
"""
Set reindex parameters.
Args:
**kwargs: Reindex parameters
Parameters:
conflicts (str): How to handle conflicts ('abort' or 'proceed')
refresh (bool or str): Refresh policy
timeout (str): Operation timeout
wait_for_active_shards (str): Wait for active shards
wait_for_completion (bool): Wait for completion
requests_per_second (int): Throttling rate
Returns:
Reindex: Current instance with parameters applied
"""
class AsyncReindex:
"""
Async version of Reindex for async/await operations.
"""
def __init__(self, using=None):
"""Initialize async reindex operation."""
def source(self, **kwargs):
"""Configure source (same as Reindex)."""
def dest(self, **kwargs):
"""Configure destination (same as Reindex)."""
def script(self, **kwargs):
"""Set script (same as Reindex)."""
def params(self, **kwargs):
"""Set parameters (same as Reindex)."""
async def execute(self):
"""
Async execute reindex operation.
Returns:
dict: Reindex response with statistics
"""from elasticsearch_dsl import Document, Text, Keyword, Date, Integer, connections
# Configure connection
connections.create_connection(hosts=['localhost:9200'])
class BlogPost(Document):
title = Text(analyzer='snowball')
content = Text()
author = Keyword()
published = Date()
views = Integer()
class Index:
name = 'blog'
settings = {
'number_of_shards': 2,
}
# Create index and mapping
BlogPost.init()
# Create and save document
post = BlogPost(
title='My First Post',
content='This is the content of my first blog post...',
author='john_doe',
published='2023-10-01T10:30:00',
views=0
)
post.save()
# Retrieve document
retrieved_post = BlogPost.get(id=post.meta.id)
print(f"Post: {retrieved_post.title} by {retrieved_post.author}")
# Update document
retrieved_post.views = 10
retrieved_post.save()
# Delete document
retrieved_post.delete()from elasticsearch_dsl import Document, Text, Object, Nested, InnerDoc
class Address(InnerDoc):
street = Text()
city = Text()
country = Keyword()
class Comment(InnerDoc):
author = Keyword()
content = Text()
timestamp = Date()
class User(Document):
name = Text()
email = Keyword()
address = Object(Address) # Single nested object
comments = Nested(Comment) # Array of nested objects
class Index:
name = 'users'
# Create user with nested data
user = User(
name='John Doe',
email='john@example.com',
address=Address(
street='123 Main St',
city='New York',
country='USA'
),
comments=[
Comment(
author='friend1',
content='Great profile!',
timestamp='2023-10-01T12:00:00'
)
]
)
user.save()from elasticsearch_dsl import Document, Text, connections
from elasticsearch.helpers import bulk
class Article(Document):
title = Text()
content = Text()
class Index:
name = 'articles'
# Bulk create documents
articles = [
Article(title=f'Article {i}', content=f'Content for article {i}')
for i in range(100)
]
# Bulk save using elasticsearch-py helper
actions = [
article.to_dict(include_meta=True)
for article in articles
]
bulk(connections.get_connection(), actions)Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch-dsl