CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch-dsl

High-level Python library for Elasticsearch providing an idiomatic way to write and manipulate queries.

Pending
Overview
Eval results
Files

document-operations.mddocs/

Document Operations

Object-relational mapping for Elasticsearch documents providing automatic index management, CRUD operations, bulk processing, and lifecycle hooks. The Document class bridges Python objects and Elasticsearch documents while maintaining type safety and providing convenient persistence methods.

Capabilities

Document Class Definition

Base class for creating Elasticsearch documents with field definitions, index configuration, and persistence methods.

class Document:
    """
    Base class for Elasticsearch documents.
    
    Attributes are automatically converted to appropriate Field instances
    based on their type annotations or assignments.
    """
    def __init__(self, meta=None, **kwargs):
        """
        Initialize document instance.
        
        Args:
            meta (dict, optional): Document metadata (id, index, etc.)
            **kwargs: Field values for the document
        """
    
    def save(self, using=None, index=None, validate=True, skip_empty=True, **kwargs):
        """
        Save document to Elasticsearch.
        
        Args:
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            validate (bool): Whether to validate before saving
            skip_empty (bool): Skip empty fields
            **kwargs: Additional Elasticsearch index parameters
            
        Returns:
            bool: True if document was created, False if updated
        """
    
    def delete(self, using=None, index=None, **kwargs):
        """
        Delete document from Elasticsearch.
        
        Args:
            using (str, optional): Connection alias to use  
            index (str, optional): Index name override
            **kwargs: Additional Elasticsearch delete parameters
            
        Returns:
            bool: True if document was deleted
        """
    
    def update(self, using=None, index=None, detect_noop=True, **kwargs):
        """
        Update document in Elasticsearch.
        
        Args:
            using (str, optional): Connection alias to use
            index (str, optional): Index name override  
            detect_noop (bool): Detect if update is a no-op
            **kwargs: Additional update parameters
            
        Returns:
            dict: Update response from Elasticsearch
        """
    
    @classmethod
    def get(cls, id, using=None, index=None, **kwargs):
        """
        Retrieve document by ID.
        
        Args:
            id: Document ID
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            **kwargs: Additional get parameters
            
        Returns:
            Document: Document instance
            
        Raises:
            NotFoundError: If document doesn't exist
        """
    
    @classmethod  
    def mget(cls, docs, using=None, index=None, raise_on_error=True, **kwargs):
        """
        Multi-get documents by IDs.
        
        Args:
            docs (list): List of document IDs or dicts with ID and other params
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            raise_on_error (bool): Raise exception on missing documents
            **kwargs: Additional mget parameters
            
        Returns:
            list: List of Document instances (None for missing docs if not raising)
        """
    
    @classmethod
    def search(cls, using=None, index=None):
        """
        Create Search instance for this document type.
        
        Args:
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            
        Returns:
            Search: Search instance configured for this document type
        """
    
    @classmethod
    def init(cls, index=None, using=None, **kwargs):
        """
        Create index and put mapping for this document.
        
        Args:
            index (str, optional): Index name override
            using (str, optional): Connection alias to use
            **kwargs: Additional index creation parameters
        """
    
    def to_dict(self, include_meta=False, skip_empty=True):
        """
        Convert document to dictionary.
        
        Args:
            include_meta (bool): Include document metadata
            skip_empty (bool): Skip empty fields
            
        Returns:
            dict: Document as dictionary
        """
    
    @classmethod
    def from_dict(cls, d):
        """
        Create document instance from dictionary.
        
        Args:
            d (dict): Dictionary with document data
            
        Returns:
            Document: Document instance
        """

Async Document Operations

Asynchronous version of Document class for async/await operations.

class AsyncDocument:
    """
    Async version of Document class for async/await operations.
    """
    
    async def save(self, using=None, index=None, validate=True, skip_empty=True, **kwargs):
        """
        Async save document to Elasticsearch.
        
        Args:
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            validate (bool): Whether to validate before saving
            skip_empty (bool): Skip empty fields
            **kwargs: Additional Elasticsearch index parameters
            
        Returns:
            bool: True if document was created, False if updated
        """
    
    async def delete(self, using=None, index=None, **kwargs):
        """
        Async delete document from Elasticsearch.
        
        Args:
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            **kwargs: Additional Elasticsearch delete parameters
            
        Returns:
            bool: True if document was deleted
        """
    
    async def update(self, using=None, index=None, detect_noop=True, **kwargs):
        """
        Async update document in Elasticsearch.
        
        Args:
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            detect_noop (bool): Detect if update is a no-op
            **kwargs: Additional update parameters
            
        Returns:
            dict: Update response from Elasticsearch
        """
    
    @classmethod
    async def get(cls, id, using=None, index=None, **kwargs):
        """
        Async retrieve document by ID.
        
        Args:
            id: Document ID
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            **kwargs: Additional get parameters
            
        Returns:
            AsyncDocument: Document instance
            
        Raises:
            NotFoundError: If document doesn't exist
        """
    
    @classmethod
    async def mget(cls, docs, using=None, index=None, raise_on_error=True, **kwargs):
        """
        Async multi-get documents by IDs.
        
        Args:
            docs (list): List of document IDs or dicts with ID and other params
            using (str, optional): Connection alias to use
            index (str, optional): Index name override
            raise_on_error (bool): Raise exception on missing documents
            **kwargs: Additional mget parameters
            
        Returns:
            list: List of AsyncDocument instances
        """
    
    @classmethod
    async def init(cls, index=None, using=None, **kwargs):
        """
        Async create index and put mapping for this document.
        
        Args:
            index (str, optional): Index name override
            using (str, optional): Connection alias to use
            **kwargs: Additional index creation parameters
        """

Inner Document Definition

For nested document definitions within other documents.

class InnerDoc:
    """
    Base class for nested document definitions.
    
    Used to define object and nested field structures within documents.
    """
    
    def __init__(self, **kwargs):
        """
        Initialize inner document.
        
        Args:
            **kwargs: Field values for the inner document
        """
    
    def to_dict(self, skip_empty=True):
        """
        Convert inner document to dictionary.
        
        Args:
            skip_empty (bool): Skip empty fields
            
        Returns:
            dict: Inner document as dictionary
        """

Index Configuration

Configure index settings and mappings within Document classes.

class Index:
    """
    Index configuration class used within Document definitions.
    
    Example:
        class MyDoc(Document):
            title = Text()
            
            class Index:
                name = 'my_index'
                settings = {
                    'number_of_shards': 2,
                    'number_of_replicas': 1
                }
    """
    name: str  # Index name
    settings: dict  # Index settings
    aliases: dict  # Index aliases

Meta Information

Document metadata handling for ID, index, routing, and other Elasticsearch document properties.

class Meta:
    """
    Document metadata container.
    
    Accessible via document.meta property.
    """
    id: str  # Document ID
    index: str  # Document index
    doc_type: str  # Document type (deprecated in ES 7+)
    routing: str  # Document routing
    parent: str  # Parent document ID (for parent-child)
    version: int  # Document version
    seq_no: int  # Sequence number
    primary_term: int  # Primary term
    score: float  # Search score (when from search results)

Update by Query Operations

Update multiple documents matching a query.

class UpdateByQuery:
    """
    Update documents matching a query.
    """
    
    def __init__(self, using=None, index=None):
        """
        Initialize update by query operation.
        
        Args:
            using (str, optional): Connection alias to use
            index (str or list, optional): Index name(s) to update
        """
    
    def script(self, **kwargs):
        """
        Set update script.
        
        Args:
            **kwargs: Script parameters
            
        Returns:
            UpdateByQuery: Current instance with script applied
        """
    
    def query(self, query, **kwargs):
        """
        Set query to match documents for update.
        
        Args:
            query (str or Query): Query to match documents
            **kwargs: Query parameters if query is a string
            
        Returns:
            UpdateByQuery: Current instance with query applied
        """
    
    def filter(self, query, **kwargs):
        """
        Add filter to update by query.
        
        Args:
            query (str or Query): Filter query
            **kwargs: Filter parameters if query is a string
            
        Returns:
            UpdateByQuery: Current instance with filter applied
        """
    
    def execute(self):
        """
        Execute update by query operation.
        
        Returns:
            dict: Update by query response with statistics
        """
    
    def params(self, **kwargs):
        """
        Set update by query parameters.
        
        Args:
            **kwargs: Update parameters
            
        Parameters:
            conflicts (str): How to handle conflicts ('abort' or 'proceed')
            refresh (bool or str): Refresh policy
            timeout (str): Operation timeout
            wait_for_active_shards (str): Wait for active shards
            wait_for_completion (bool): Wait for completion
            requests_per_second (int): Throttling rate
            scroll_size (int): Scroll batch size
            pipeline (str): Ingest pipeline to use
            
        Returns:
            UpdateByQuery: Current instance with parameters applied
        """

class AsyncUpdateByQuery:
    """
    Async version of UpdateByQuery for async/await operations.
    """
    
    def __init__(self, using=None, index=None):
        """Initialize async update by query operation."""
    
    def script(self, **kwargs):
        """Set update script (same as UpdateByQuery)."""
    
    def query(self, query, **kwargs):
        """Set query to match documents (same as UpdateByQuery)."""
    
    def filter(self, query, **kwargs):
        """Add filter (same as UpdateByQuery)."""
    
    def params(self, **kwargs):
        """Set parameters (same as UpdateByQuery)."""
    
    async def execute(self):
        """
        Async execute update by query operation.
        
        Returns:
            dict: Update by query response with statistics
        """

Delete by Query Operations

Delete multiple documents matching a query.

class DeleteByQuery:
    """
    Delete documents matching a query.
    """
    
    def __init__(self, using=None, index=None):
        """
        Initialize delete by query operation.
        
        Args:
            using (str, optional): Connection alias to use
            index (str or list, optional): Index name(s) to delete from
        """
    
    def query(self, query, **kwargs):
        """
        Set query to match documents for deletion.
        
        Args:
            query (str or Query): Query to match documents
            **kwargs: Query parameters if query is a string
            
        Returns:
            DeleteByQuery: Current instance with query applied
        """
    
    def filter(self, query, **kwargs):
        """
        Add filter to delete by query.
        
        Args:
            query (str or Query): Filter query
            **kwargs: Filter parameters if query is a string
            
        Returns:
            DeleteByQuery: Current instance with filter applied
        """
    
    def execute(self):
        """
        Execute delete by query operation.
        
        Returns:
            dict: Delete by query response with statistics
        """
    
    def params(self, **kwargs):
        """
        Set delete by query parameters.
        
        Args:
            **kwargs: Delete parameters
            
        Parameters:
            conflicts (str): How to handle conflicts ('abort' or 'proceed')
            refresh (bool or str): Refresh policy
            timeout (str): Operation timeout
            wait_for_active_shards (str): Wait for active shards
            wait_for_completion (bool): Wait for completion
            requests_per_second (int): Throttling rate
            scroll_size (int): Scroll batch size
            
        Returns:
            DeleteByQuery: Current instance with parameters applied
        """

class AsyncDeleteByQuery:
    """
    Async version of DeleteByQuery for async/await operations.
    """
    
    def __init__(self, using=None, index=None):
        """Initialize async delete by query operation."""
    
    def query(self, query, **kwargs):
        """Set query to match documents (same as DeleteByQuery)."""
    
    def filter(self, query, **kwargs):
        """Add filter (same as DeleteByQuery)."""
    
    def params(self, **kwargs):
        """Set parameters (same as DeleteByQuery)."""
    
    async def execute(self):
        """
        Async execute delete by query operation.
        
        Returns:
            dict: Delete by query response with statistics
        """

Reindex Operations

Reindex documents from source to destination index.

class Reindex:
    """
    Reindex documents from source to destination.
    """
    
    def __init__(self, using=None):
        """
        Initialize reindex operation.
        
        Args:
            using (str, optional): Connection alias to use
        """
    
    def source(self, **kwargs):
        """
        Configure source for reindex operation.
        
        Args:
            **kwargs: Source configuration
            
        Parameters:
            index (str or list): Source index name(s)
            query (dict): Query to filter source documents
            sort (list): Sort order for source documents
            _source (list or dict): Source field filtering
            size (int): Batch size for reindexing
            
        Returns:
            Reindex: Current instance with source configured
        """
    
    def dest(self, **kwargs):
        """
        Configure destination for reindex operation.
        
        Args:
            **kwargs: Destination configuration
            
        Parameters:
            index (str): Destination index name
            type (str): Destination document type (deprecated)
            routing (str): Routing for destination documents
            op_type (str): Operation type ('index' or 'create')
            version_type (str): Version type for conflicts
            pipeline (str): Ingest pipeline to use
            
        Returns:
            Reindex: Current instance with destination configured
        """
    
    def script(self, **kwargs):
        """
        Set reindex script for document transformation.
        
        Args:
            **kwargs: Script configuration
            
        Returns:
            Reindex: Current instance with script applied
        """
    
    def execute(self):
        """
        Execute reindex operation.
        
        Returns:
            dict: Reindex response with statistics
        """
    
    def params(self, **kwargs):
        """
        Set reindex parameters.
        
        Args:
            **kwargs: Reindex parameters
            
        Parameters:
            conflicts (str): How to handle conflicts ('abort' or 'proceed')
            refresh (bool or str): Refresh policy
            timeout (str): Operation timeout
            wait_for_active_shards (str): Wait for active shards
            wait_for_completion (bool): Wait for completion
            requests_per_second (int): Throttling rate
            
        Returns:
            Reindex: Current instance with parameters applied
        """

class AsyncReindex:
    """
    Async version of Reindex for async/await operations.
    """
    
    def __init__(self, using=None):
        """Initialize async reindex operation."""
    
    def source(self, **kwargs):
        """Configure source (same as Reindex)."""
    
    def dest(self, **kwargs):
        """Configure destination (same as Reindex)."""
    
    def script(self, **kwargs):
        """Set script (same as Reindex)."""
    
    def params(self, **kwargs):
        """Set parameters (same as Reindex)."""
    
    async def execute(self):
        """
        Async execute reindex operation.
        
        Returns:
            dict: Reindex response with statistics
        """

Usage Examples

Basic Document Definition and Operations

from elasticsearch_dsl import Document, Text, Keyword, Date, Integer, connections

# Configure connection
connections.create_connection(hosts=['localhost:9200'])

class BlogPost(Document):
    title = Text(analyzer='snowball')
    content = Text()
    author = Keyword()
    published = Date()
    views = Integer()
    
    class Index:
        name = 'blog'
        settings = {
            'number_of_shards': 2,
        }

# Create index and mapping
BlogPost.init()

# Create and save document
post = BlogPost(
    title='My First Post',
    content='This is the content of my first blog post...',
    author='john_doe',
    published='2023-10-01T10:30:00',
    views=0
)
post.save()

# Retrieve document
retrieved_post = BlogPost.get(id=post.meta.id)
print(f"Post: {retrieved_post.title} by {retrieved_post.author}")

# Update document
retrieved_post.views = 10
retrieved_post.save()

# Delete document
retrieved_post.delete()

Nested and Object Fields

from elasticsearch_dsl import Document, Text, Object, Nested, InnerDoc

class Address(InnerDoc):
    street = Text()
    city = Text()
    country = Keyword()

class Comment(InnerDoc):
    author = Keyword()
    content = Text()
    timestamp = Date()

class User(Document):
    name = Text()
    email = Keyword()
    address = Object(Address)  # Single nested object
    comments = Nested(Comment)  # Array of nested objects
    
    class Index:
        name = 'users'

# Create user with nested data
user = User(
    name='John Doe',
    email='john@example.com',
    address=Address(
        street='123 Main St',
        city='New York',
        country='USA'
    ),
    comments=[
        Comment(
            author='friend1',
            content='Great profile!',
            timestamp='2023-10-01T12:00:00'
        )
    ]
)
user.save()

Bulk Operations

from elasticsearch_dsl import Document, Text, connections
from elasticsearch.helpers import bulk

class Article(Document):
    title = Text()
    content = Text()
    
    class Index:
        name = 'articles'

# Bulk create documents
articles = [
    Article(title=f'Article {i}', content=f'Content for article {i}')
    for i in range(100)
]

# Bulk save using elasticsearch-py helper
actions = [
    article.to_dict(include_meta=True)
    for article in articles
]
bulk(connections.get_connection(), actions)

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch-dsl

docs

aggregations.md

analysis.md

connections.md

document-operations.md

field-types.md

index-management.md

index.md

search-queries.md

tile.json