Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities
—
Object-relational mapping (ORM) style document modeling with field definitions, automatic mapping generation, and validation for structured data handling in OpenSearch. The DSL provides a Pythonic way to define document schemas and interact with OpenSearch indices.
Base class for creating document models with ORM-like functionality.
class Document:
def __init__(self, **kwargs):
"""
Initialize document instance with field values.
Parameters:
- **kwargs: Field values for the document
"""
def save(self, using=None, index=None, **kwargs):
"""
Save document to OpenSearch.
Parameters:
- using: OpenSearch client instance
- index (str, optional): Index name (uses class Meta if not provided)
- refresh (str/bool, optional): Refresh policy
- routing (str, optional): Routing value
- pipeline (str, optional): Ingest pipeline
Returns:
Document instance with updated meta information
"""
def update(self, using=None, index=None, **kwargs):
"""
Update document in OpenSearch.
Parameters:
- using: OpenSearch client instance
- index (str, optional): Index name
- refresh (str/bool, optional): Refresh policy
- retry_on_conflict (int, optional): Retry on version conflict
Returns:
Updated document instance
"""
def delete(self, using=None, index=None, **kwargs):
"""
Delete document from OpenSearch.
Parameters:
- using: OpenSearch client instance
- index (str, optional): Index name
- refresh (str/bool, optional): Refresh policy
Returns:
Deletion response
"""
@classmethod
def get(cls, id, using=None, index=None, **kwargs):
"""
Retrieve document by ID.
Parameters:
- id: Document ID
- using: OpenSearch client instance
- index (str, optional): Index name
Returns:
Document instance
Raises:
NotFoundError: If document doesn't exist
"""
@classmethod
def mget(cls, docs, using=None, index=None, **kwargs):
"""
Retrieve multiple documents by ID.
Parameters:
- docs: List of document IDs or dicts with id/index
- using: OpenSearch client instance
- index (str, optional): Default index name
Returns:
List of document instances
"""
@classmethod
def search(cls, using=None, index=None):
"""
Get Search object for this document type.
Parameters:
- using: OpenSearch client instance
- index (str, optional): Index name
Returns:
Search instance configured for this document type
"""
@classmethod
def exists(cls, id, using=None, index=None, **kwargs):
"""
Check if document exists.
Parameters:
- id: Document ID
- using: OpenSearch client instance
- index (str, optional): Index name
Returns:
bool: True if document exists
"""
def to_dict(self, include_meta=False, skip_empty=True):
"""
Convert document to dictionary.
Parameters:
- include_meta (bool): Include metadata fields
- skip_empty (bool): Skip fields with empty values
Returns:
dict: Document as dictionary
"""
@classmethod
def from_dict(cls, d):
"""
Create document instance from dictionary.
Parameters:
- d (dict): Document data
Returns:
Document instance
"""For modeling nested objects within documents.
class InnerDoc:
def __init__(self, **kwargs):
"""
Initialize inner document with field values.
Parameters:
- **kwargs: Field values for the inner document
"""
def to_dict(self, skip_empty=True):
"""
Convert inner document to dictionary.
Parameters:
- skip_empty (bool): Skip fields with empty values
Returns:
dict: Inner document as dictionary
"""
@classmethod
def from_dict(cls, d):
"""
Create inner document from dictionary.
Parameters:
- d (dict): Inner document data
Returns:
InnerDoc instance
"""Define and manage index mappings programmatically.
class Mapping:
def __init__(self):
"""Initialize empty mapping."""
def field(self, name, field_type, **kwargs):
"""
Add field to mapping.
Parameters:
- name (str): Field name
- field_type (str/Field): Field type or Field instance
- **kwargs: Field parameters
Returns:
self (for chaining)
"""
def meta(self, name, **kwargs):
"""
Add metadata field to mapping.
Parameters:
- name (str): Meta field name
- **kwargs: Meta field parameters
Returns:
self (for chaining)
"""
def save(self, index, using=None, **kwargs):
"""
Save mapping to OpenSearch index.
Parameters:
- index (str): Index name
- using: OpenSearch client instance
- **kwargs: Additional mapping parameters
Returns:
Mapping creation response
"""
def update_from_opensearch(self, index, using=None):
"""
Update mapping from existing OpenSearch index.
Parameters:
- index (str): Index name
- using: OpenSearch client instance
Returns:
self
"""
def to_dict(self):
"""
Convert mapping to dictionary.
Returns:
dict: Mapping as dictionary
"""Manage OpenSearch indices with settings and mappings.
class Index:
def __init__(self, name, using=None):
"""
Initialize index manager.
Parameters:
- name (str): Index name
- using: OpenSearch client instance
"""
def settings(self, **kwargs):
"""
Set index settings.
Parameters:
- **kwargs: Index settings
Returns:
self (for chaining)
"""
def mapping(self, mapping):
"""
Set index mapping.
Parameters:
- mapping (Mapping): Mapping instance
Returns:
self (for chaining)
"""
def doc_type(self, document):
"""
Register document type with index.
Parameters:
- document (Document): Document class
Returns:
self (for chaining)
"""
def analyzer(self, name, **kwargs):
"""
Add custom analyzer to index.
Parameters:
- name (str): Analyzer name
- **kwargs: Analyzer configuration
Returns:
self (for chaining)
"""
def create(self, **kwargs):
"""
Create the index in OpenSearch.
Parameters:
- **kwargs: Index creation parameters
Returns:
Index creation response
"""
def delete(self, **kwargs):
"""
Delete the index from OpenSearch.
Parameters:
- **kwargs: Index deletion parameters
Returns:
Index deletion response
"""
def exists(self):
"""
Check if index exists.
Returns:
bool: True if index exists
"""
def open(self, **kwargs):
"""
Open the index.
Parameters:
- **kwargs: Index open parameters
Returns:
Index open response
"""
def close(self, **kwargs):
"""
Close the index.
Parameters:
- **kwargs: Index close parameters
Returns:
Index close response
"""from opensearchpy import Document, Text, Keyword, Integer, Date
from datetime import datetime
class Article(Document):
title = Text(analyzer='standard')
content = Text()
author = Keyword()
category = Keyword()
published_date = Date()
view_count = Integer()
tags = Keyword(multi=True)
class Index:
name = 'articles'
settings = {
'number_of_shards': 1,
'number_of_replicas': 0
}
class Meta:
doc_type = '_doc'
# Create and save document
article = Article(
title='Introduction to OpenSearch',
content='OpenSearch is a powerful search and analytics engine...',
author='john_doe',
category='technology',
published_date=datetime.now(),
view_count=0,
tags=['search', 'analytics', 'opensource']
)
# Save to OpenSearch
article.meta.id = 'article-1'
article.save(using=client)
print(f"Article saved with ID: {article.meta.id}")from opensearchpy import Document, InnerDoc, Nested, Text, Keyword, Integer, Date
class Comment(InnerDoc):
author = Keyword()
content = Text()
created_date = Date()
rating = Integer()
class Product(Document):
name = Text()
description = Text()
category = Keyword()
price = Integer()
comments = Nested(Comment)
class Index:
name = 'products'
# Create product with nested comments
product = Product(
name='Wireless Headphones',
description='High-quality wireless headphones with noise cancellation',
category='electronics',
price=199,
comments=[
Comment(
author='user1',
content='Great sound quality!',
created_date=datetime.now(),
rating=5
),
Comment(
author='user2',
content='Good value for money',
created_date=datetime.now(),
rating=4
)
]
)
product.save(using=client)from opensearchpy import Document, Field, ValidationException
from opensearchpy.field import Text, Keyword, Integer
class EmailField(Keyword):
def clean(self, data):
if data and '@' not in data:
raise ValidationException('Invalid email format')
return super().clean(data)
class User(Document):
username = Keyword(required=True)
email = EmailField(required=True)
full_name = Text()
age = Integer()
bio = Text()
def clean(self):
# Document-level validation
if self.age and self.age < 0:
raise ValidationException('Age cannot be negative')
if self.username and len(self.username) < 3:
raise ValidationException('Username must be at least 3 characters')
def save(self, **kwargs):
# Custom save logic
self.clean()
return super().save(**kwargs)
class Index:
name = 'users'
# Create user with validation
user = User(
username='johndoe',
email='john@example.com',
full_name='John Doe',
age=30,
bio='Software developer interested in search technologies'
)
user.save(using=client)from opensearchpy import Document, Join, Text, Keyword, Integer
class BlogPost(Document):
title = Text()
content = Text()
author = Keyword()
post_comment = Join(relations={'post': 'comment'})
class Index:
name = 'blog'
class Comment(Document):
content = Text()
author = Keyword()
post_comment = Join(relations={'post': 'comment'})
class Index:
name = 'blog'
# Create parent document (blog post)
post = BlogPost(
title='My First Blog Post',
content='This is the content of my first blog post...',
author='blogger',
post_comment={'name': 'post'}
)
post.meta.id = 'post-1'
post.save(using=client)
# Create child document (comment)
comment = Comment(
content='Great post!',
author='reader',
post_comment={'name': 'comment', 'parent': 'post-1'}
)
comment.meta.id = 'comment-1'
comment.meta.routing = 'post-1' # Route to same shard as parent
comment.save(using=client)from opensearchpy import Document, DynamicDocument, Text, Keyword
class FlexibleDocument(DynamicDocument):
"""Document that accepts any fields dynamically."""
title = Text(required=True)
category = Keyword()
class Index:
name = 'flexible_docs'
settings = {
'mappings': {
'dynamic': True,
'dynamic_templates': [
{
'strings_as_keywords': {
'match_mapping_type': 'string',
'mapping': {
'type': 'keyword'
}
}
}
]
}
}
# Create document with dynamic fields
doc = FlexibleDocument(
title='Dynamic Document',
category='example',
# These fields will be added dynamically
custom_field='custom_value',
numerical_data=42,
metadata={'version': '1.0', 'source': 'api'}
)
doc.save(using=client)from opensearchpy.helpers import parallel_bulk
def generate_articles(count=1000):
"""Generate article documents."""
for i in range(count):
article = Article(
title=f'Article {i}',
content=f'Content for article {i}...',
author=f'author_{i % 10}',
category='technology',
published_date=datetime.now(),
view_count=0,
tags=['tag1', 'tag2']
)
article.meta.id = f'article-{i}'
yield article.to_dict(include_meta=True)
# Bulk save articles
for success, info in parallel_bulk(
client,
generate_articles(1000),
index='articles',
chunk_size=100
):
if not success:
print(f'Failed to index: {info}')
print('Bulk indexing completed')from opensearchpy import Search, Q
# Search using document model
s = Article.search(using=client)
s = s.query(Q('match', title='OpenSearch'))
s = s.filter(Q('term', category='technology'))
s = s.sort('-published_date')
# Execute search and get document instances
response = s.execute()
for article in response:
print(f'Title: {article.title}')
print(f'Author: {article.author}')
print(f'Published: {article.published_date}')
print('---')
# Aggregations with document models
s = Article.search(using=client)
s.aggs.bucket('authors', 'terms', field='author', size=10)
s.aggs.bucket('categories', 'terms', field='category')
response = s.execute()
print('Top authors:')
for bucket in response.aggregations.authors.buckets:
print(f' {bucket.key}: {bucket.doc_count} articles')from opensearchpy import Index
# Create index with custom settings
index = Index('articles', using=client)
index.settings(
number_of_shards=2,
number_of_replicas=1,
analysis={
'analyzer': {
'custom_text_analyzer': {
'type': 'custom',
'tokenizer': 'standard',
'filter': ['lowercase', 'stop', 'snowball']
}
}
}
)
# Register document type
index.doc_type(Article)
# Create the index
if not index.exists():
index.create()
print('Index created successfully')
# Update mapping for existing index
from opensearchpy import Mapping, Text, Keyword
mapping = Mapping()
mapping.field('title', Text(analyzer='custom_text_analyzer'))
mapping.field('summary', Text())
mapping.field('status', Keyword())
mapping.save('articles', using=client)
print('Mapping updated')Install with Tessl CLI
npx tessl i tessl/pypi-opensearch-py