CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opensearch-py

Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities

Pending
Overview
Eval results
Files

document-modeling.mddocs/

Document Modeling

Object-relational mapping (ORM) style document modeling with field definitions, automatic mapping generation, and validation for structured data handling in OpenSearch. The DSL provides a Pythonic way to define document schemas and interact with OpenSearch indices.

Capabilities

Document Base Class

Base class for creating document models with ORM-like functionality.

class Document:
    def __init__(self, **kwargs):
        """
        Initialize document instance with field values.
        
        Parameters:
        - **kwargs: Field values for the document
        """
    
    def save(self, using=None, index=None, **kwargs):
        """
        Save document to OpenSearch.
        
        Parameters:
        - using: OpenSearch client instance
        - index (str, optional): Index name (uses class Meta if not provided)
        - refresh (str/bool, optional): Refresh policy
        - routing (str, optional): Routing value
        - pipeline (str, optional): Ingest pipeline
        
        Returns:
        Document instance with updated meta information
        """
    
    def update(self, using=None, index=None, **kwargs):
        """
        Update document in OpenSearch.
        
        Parameters:
        - using: OpenSearch client instance
        - index (str, optional): Index name
        - refresh (str/bool, optional): Refresh policy
        - retry_on_conflict (int, optional): Retry on version conflict
        
        Returns:
        Updated document instance
        """
    
    def delete(self, using=None, index=None, **kwargs):
        """
        Delete document from OpenSearch.
        
        Parameters:
        - using: OpenSearch client instance
        - index (str, optional): Index name
        - refresh (str/bool, optional): Refresh policy
        
        Returns:
        Deletion response
        """
    
    @classmethod
    def get(cls, id, using=None, index=None, **kwargs):
        """
        Retrieve document by ID.
        
        Parameters:
        - id: Document ID
        - using: OpenSearch client instance
        - index (str, optional): Index name
        
        Returns:
        Document instance
        
        Raises:
        NotFoundError: If document doesn't exist
        """
    
    @classmethod
    def mget(cls, docs, using=None, index=None, **kwargs):
        """
        Retrieve multiple documents by ID.
        
        Parameters:
        - docs: List of document IDs or dicts with id/index
        - using: OpenSearch client instance
        - index (str, optional): Default index name
        
        Returns:
        List of document instances
        """
    
    @classmethod
    def search(cls, using=None, index=None):
        """
        Get Search object for this document type.
        
        Parameters:
        - using: OpenSearch client instance
        - index (str, optional): Index name
        
        Returns:
        Search instance configured for this document type
        """
    
    @classmethod
    def exists(cls, id, using=None, index=None, **kwargs):
        """
        Check if document exists.
        
        Parameters:
        - id: Document ID
        - using: OpenSearch client instance
        - index (str, optional): Index name
        
        Returns:
        bool: True if document exists
        """
    
    def to_dict(self, include_meta=False, skip_empty=True):
        """
        Convert document to dictionary.
        
        Parameters:
        - include_meta (bool): Include metadata fields
        - skip_empty (bool): Skip fields with empty values
        
        Returns:
        dict: Document as dictionary
        """
    
    @classmethod
    def from_dict(cls, d):
        """
        Create document instance from dictionary.
        
        Parameters:
        - d (dict): Document data
        
        Returns:
        Document instance
        """

Inner Document Class

For modeling nested objects within documents.

class InnerDoc:
    def __init__(self, **kwargs):
        """
        Initialize inner document with field values.
        
        Parameters:
        - **kwargs: Field values for the inner document
        """
    
    def to_dict(self, skip_empty=True):
        """
        Convert inner document to dictionary.
        
        Parameters:
        - skip_empty (bool): Skip fields with empty values
        
        Returns:
        dict: Inner document as dictionary
        """
    
    @classmethod
    def from_dict(cls, d):
        """
        Create inner document from dictionary.
        
        Parameters:
        - d (dict): Inner document data
        
        Returns:
        InnerDoc instance
        """

Mapping Management

Define and manage index mappings programmatically.

class Mapping:
    def __init__(self):
        """Initialize empty mapping."""
    
    def field(self, name, field_type, **kwargs):
        """
        Add field to mapping.
        
        Parameters:
        - name (str): Field name
        - field_type (str/Field): Field type or Field instance
        - **kwargs: Field parameters
        
        Returns:
        self (for chaining)
        """
    
    def meta(self, name, **kwargs):
        """
        Add metadata field to mapping.
        
        Parameters:
        - name (str): Meta field name
        - **kwargs: Meta field parameters
        
        Returns:
        self (for chaining)
        """
    
    def save(self, index, using=None, **kwargs):
        """
        Save mapping to OpenSearch index.
        
        Parameters:
        - index (str): Index name
        - using: OpenSearch client instance
        - **kwargs: Additional mapping parameters
        
        Returns:
        Mapping creation response
        """
    
    def update_from_opensearch(self, index, using=None):
        """
        Update mapping from existing OpenSearch index.
        
        Parameters:
        - index (str): Index name
        - using: OpenSearch client instance
        
        Returns:
        self
        """
    
    def to_dict(self):
        """
        Convert mapping to dictionary.
        
        Returns:
        dict: Mapping as dictionary
        """

Index Management

Manage OpenSearch indices with settings and mappings.

class Index:
    def __init__(self, name, using=None):
        """
        Initialize index manager.
        
        Parameters:
        - name (str): Index name
        - using: OpenSearch client instance
        """
    
    def settings(self, **kwargs):
        """
        Set index settings.
        
        Parameters:
        - **kwargs: Index settings
        
        Returns:
        self (for chaining)
        """
    
    def mapping(self, mapping):
        """
        Set index mapping.
        
        Parameters:
        - mapping (Mapping): Mapping instance
        
        Returns:
        self (for chaining)
        """
    
    def doc_type(self, document):
        """
        Register document type with index.
        
        Parameters:
        - document (Document): Document class
        
        Returns:
        self (for chaining)
        """
    
    def analyzer(self, name, **kwargs):
        """
        Add custom analyzer to index.
        
        Parameters:
        - name (str): Analyzer name
        - **kwargs: Analyzer configuration
        
        Returns:
        self (for chaining)
        """
    
    def create(self, **kwargs):
        """
        Create the index in OpenSearch.
        
        Parameters:
        - **kwargs: Index creation parameters
        
        Returns:
        Index creation response
        """
    
    def delete(self, **kwargs):
        """
        Delete the index from OpenSearch.
        
        Parameters:
        - **kwargs: Index deletion parameters
        
        Returns:
        Index deletion response
        """
    
    def exists(self):
        """
        Check if index exists.
        
        Returns:
        bool: True if index exists
        """
    
    def open(self, **kwargs):
        """
        Open the index.
        
        Parameters:
        - **kwargs: Index open parameters
        
        Returns:
        Index open response
        """
    
    def close(self, **kwargs):
        """
        Close the index.
        
        Parameters:
        - **kwargs: Index close parameters
        
        Returns:
        Index close response
        """

Usage Examples

Basic Document Model

from opensearchpy import Document, Text, Keyword, Integer, Date
from datetime import datetime

class Article(Document):
    title = Text(analyzer='standard')
    content = Text()
    author = Keyword()
    category = Keyword()
    published_date = Date()
    view_count = Integer()
    tags = Keyword(multi=True)
    
    class Index:
        name = 'articles'
        settings = {
            'number_of_shards': 1,
            'number_of_replicas': 0
        }
    
    class Meta:
        doc_type = '_doc'

# Create and save document
article = Article(
    title='Introduction to OpenSearch',
    content='OpenSearch is a powerful search and analytics engine...',
    author='john_doe',
    category='technology',
    published_date=datetime.now(),
    view_count=0,
    tags=['search', 'analytics', 'opensource']
)

# Save to OpenSearch
article.meta.id = 'article-1'
article.save(using=client)

print(f"Article saved with ID: {article.meta.id}")

Nested Document Modeling

from opensearchpy import Document, InnerDoc, Nested, Text, Keyword, Integer, Date

class Comment(InnerDoc):
    author = Keyword()
    content = Text()
    created_date = Date()
    rating = Integer()

class Product(Document):
    name = Text()
    description = Text()
    category = Keyword()
    price = Integer()
    comments = Nested(Comment)
    
    class Index:
        name = 'products'

# Create product with nested comments
product = Product(
    name='Wireless Headphones',
    description='High-quality wireless headphones with noise cancellation',
    category='electronics',
    price=199,
    comments=[
        Comment(
            author='user1',
            content='Great sound quality!',
            created_date=datetime.now(),
            rating=5
        ),
        Comment(
            author='user2', 
            content='Good value for money',
            created_date=datetime.now(),
            rating=4
        )
    ]
)

product.save(using=client)

Custom Field Types and Validation

from opensearchpy import Document, Field, ValidationException
from opensearchpy.field import Text, Keyword, Integer

class EmailField(Keyword):
    def clean(self, data):
        if data and '@' not in data:
            raise ValidationException('Invalid email format')
        return super().clean(data)

class User(Document):
    username = Keyword(required=True)
    email = EmailField(required=True)
    full_name = Text()
    age = Integer()
    bio = Text()
    
    def clean(self):
        # Document-level validation
        if self.age and self.age < 0:
            raise ValidationException('Age cannot be negative')
        
        if self.username and len(self.username) < 3:
            raise ValidationException('Username must be at least 3 characters')
    
    def save(self, **kwargs):
        # Custom save logic
        self.clean()
        return super().save(**kwargs)
    
    class Index:
        name = 'users'

# Create user with validation
user = User(
    username='johndoe',
    email='john@example.com',
    full_name='John Doe',
    age=30,
    bio='Software developer interested in search technologies'
)

user.save(using=client)

Document Relationships

from opensearchpy import Document, Join, Text, Keyword, Integer

class BlogPost(Document):
    title = Text()
    content = Text()
    author = Keyword()
    post_comment = Join(relations={'post': 'comment'})
    
    class Index:
        name = 'blog'

class Comment(Document):
    content = Text()
    author = Keyword()
    post_comment = Join(relations={'post': 'comment'})
    
    class Index:
        name = 'blog'

# Create parent document (blog post)
post = BlogPost(
    title='My First Blog Post',
    content='This is the content of my first blog post...',
    author='blogger',
    post_comment={'name': 'post'}
)
post.meta.id = 'post-1'
post.save(using=client)

# Create child document (comment)
comment = Comment(
    content='Great post!',
    author='reader',
    post_comment={'name': 'comment', 'parent': 'post-1'}
)
comment.meta.id = 'comment-1'
comment.meta.routing = 'post-1'  # Route to same shard as parent
comment.save(using=client)

Dynamic Document Templates

from opensearchpy import Document, DynamicDocument, Text, Keyword

class FlexibleDocument(DynamicDocument):
    """Document that accepts any fields dynamically."""
    title = Text(required=True)
    category = Keyword()
    
    class Index:
        name = 'flexible_docs'
        settings = {
            'mappings': {
                'dynamic': True,
                'dynamic_templates': [
                    {
                        'strings_as_keywords': {
                            'match_mapping_type': 'string',
                            'mapping': {
                                'type': 'keyword'
                            }
                        }
                    }
                ]
            }
        }

# Create document with dynamic fields
doc = FlexibleDocument(
    title='Dynamic Document',
    category='example',
    # These fields will be added dynamically
    custom_field='custom_value',
    numerical_data=42,
    metadata={'version': '1.0', 'source': 'api'}
)

doc.save(using=client)

Bulk Document Operations

from opensearchpy.helpers import parallel_bulk

def generate_articles(count=1000):
    """Generate article documents."""
    for i in range(count):
        article = Article(
            title=f'Article {i}',
            content=f'Content for article {i}...',
            author=f'author_{i % 10}',
            category='technology',
            published_date=datetime.now(),
            view_count=0,
            tags=['tag1', 'tag2']
        )
        article.meta.id = f'article-{i}'
        yield article.to_dict(include_meta=True)

# Bulk save articles
for success, info in parallel_bulk(
    client,
    generate_articles(1000),
    index='articles',
    chunk_size=100
):
    if not success:
        print(f'Failed to index: {info}')

print('Bulk indexing completed')

Search with Document Models

from opensearchpy import Search, Q

# Search using document model
s = Article.search(using=client)
s = s.query(Q('match', title='OpenSearch'))
s = s.filter(Q('term', category='technology'))
s = s.sort('-published_date')

# Execute search and get document instances
response = s.execute()

for article in response:
    print(f'Title: {article.title}')
    print(f'Author: {article.author}')
    print(f'Published: {article.published_date}')
    print('---')

# Aggregations with document models
s = Article.search(using=client)
s.aggs.bucket('authors', 'terms', field='author', size=10)
s.aggs.bucket('categories', 'terms', field='category')

response = s.execute()

print('Top authors:')
for bucket in response.aggregations.authors.buckets:
    print(f'  {bucket.key}: {bucket.doc_count} articles')

Index Management with Documents

from opensearchpy import Index

# Create index with custom settings
index = Index('articles', using=client)
index.settings(
    number_of_shards=2,
    number_of_replicas=1,
    analysis={
        'analyzer': {
            'custom_text_analyzer': {
                'type': 'custom',
                'tokenizer': 'standard',
                'filter': ['lowercase', 'stop', 'snowball']
            }
        }
    }
)

# Register document type
index.doc_type(Article)

# Create the index
if not index.exists():
    index.create()
    print('Index created successfully')

# Update mapping for existing index
from opensearchpy import Mapping, Text, Keyword

mapping = Mapping()
mapping.field('title', Text(analyzer='custom_text_analyzer'))
mapping.field('summary', Text())
mapping.field('status', Keyword())

mapping.save('articles', using=client)
print('Mapping updated')

Install with Tessl CLI

npx tessl i tessl/pypi-opensearch-py

docs

async-operations.md

authentication.md

core-client.md

document-modeling.md

dsl-queries.md

helper-functions.md

index.md

namespaced-apis.md

plugin-apis.md

tile.json