CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyes

Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters

Pending
Overview
Eval results
Files

client.mddocs/

PyES Client Operations

Overview

The ES class is the main entry point for all ElasticSearch operations in PyES. It manages connections, provides document operations, search functionality, index management, and cluster administration.

ES Class API Reference

class ES:
    """
    Main ElasticSearch client class providing connection management 
    and all ElasticSearch operations.
    
    Args:
        server (str|list): ElasticSearch server(s). Default: "localhost:9200"
        timeout (float): Request timeout in seconds. Default: 30.0
        bulk_size (int): Number of operations per bulk request. Default: 400
        encoder (callable): Custom JSON encoder. Default: None
        decoder (callable): Custom JSON decoder. Default: None
        max_retries (int): Maximum retry attempts. Default: 3
        retry_time (int): Retry delay in seconds. Default: 60
        default_indices (list): Default indices for operations. Default: None
        default_types (list): Default document types. Default: None
        log_curl (bool): Log curl commands. Default: False
        dump_curl (bool): Dump curl to file. Default: False
        model (class): Document model class. Default: ElasticSearchModel
        basic_auth (tuple): (username, password) for authentication. Default: None
        raise_on_bulk_item_failure (bool): Raise on bulk item failures. Default: False
        document_object_field (str): Document object field name. Default: None
        bulker_class (class): Bulk operations class. Default: ListBulker
        cert_reqs (str): SSL certificate requirements. Default: 'CERT_OPTIONAL'
    """
    
    def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
                 encoder=None, decoder=None, max_retries=3, retry_time=60,
                 default_indices=None, default_types=None, log_curl=False,
                 dump_curl=False, model=ElasticSearchModel, basic_auth=None,
                 raise_on_bulk_item_failure=False, document_object_field=None,
                 bulker_class=ListBulker, cert_reqs='CERT_OPTIONAL'):
        pass

Connection Management

Basic Connection

from pyes import ES

# Single server connection
es = ES('localhost:9200')

# Multiple servers for failover
es = ES(['server1:9200', 'server2:9200', 'server3:9200'])

# With authentication
es = ES('localhost:9200', basic_auth=('username', 'password'))

# With SSL configuration
es = ES('https://secure-es.example.com:9200', cert_reqs='CERT_REQUIRED')

Connection Configuration

# Advanced connection configuration
es = ES(
    server="localhost:9200",
    timeout=45.0,           # 45 second timeout
    max_retries=5,          # Retry failed requests 5 times
    retry_time=30,          # Wait 30s between retries
    bulk_size=1000,         # Process 1000 operations per bulk
    log_curl=True,          # Log equivalent curl commands
    raise_on_bulk_item_failure=True  # Raise exceptions on bulk failures
)

Document Operations

Index Documents

def index(self, doc, index, doc_type, id=None, parent=None, 
          force_insert=False, bulk=False, **kwargs):
    """
    Index a document in ElasticSearch.
    
    Args:
        doc (dict): Document to index
        index (str): Index name
        doc_type (str): Document type  
        id (str, optional): Document ID. Auto-generated if None
        parent (str, optional): Parent document ID
        force_insert (bool): Use PUT instead of POST. Default: False
        bulk (bool): Add to bulk buffer instead of immediate index. Default: False
        **kwargs: Additional parameters (routing, refresh, etc.)
        
    Returns:
        str: Document ID if successful
        
    Raises:
        DocumentAlreadyExistsException: If document exists and force_insert=True
        IndexMissingException: If index doesn't exist
    """
    pass

# Basic document indexing
doc = {
    "title": "Python ElasticSearch Tutorial",
    "content": "Learn how to use PyES library effectively",
    "tags": ["python", "elasticsearch", "tutorial"],
    "published_date": "2023-12-01",
    "author": "Jane Developer",
    "view_count": 0
}

# Index with auto-generated ID
doc_id = es.index(doc, "blog", "post")

# Index with specific ID
es.index(doc, "blog", "post", id="tutorial-001")

# Index with parent-child relationship
child_doc = {"comment": "Great tutorial!", "author": "John Reader"}
es.index(child_doc, "blog", "comment", parent="tutorial-001")

# Index with routing for performance
es.index(doc, "blog", "post", routing="python-category")

# Force refresh immediately
es.index(doc, "blog", "post", refresh=True)

Retrieve Documents

def get(self, index, doc_type, id, fields=None, model=None, **query_params):
    """
    Retrieve a document by ID.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str): Document ID
        fields (list, optional): Specific fields to retrieve
        model (class, optional): Custom model class for result
        **query_params: Additional parameters (routing, preference, etc.)
        
    Returns:
        Document object with metadata
        
    Raises:
        DocumentMissingException: If document not found
        IndexMissingException: If index doesn't exist
    """
    pass

# Get complete document
document = es.get("blog", "post", "tutorial-001")
print(f"Title: {document.title}")
print(f"Content: {document.content}")
print(f"Document ID: {document._meta.id}")
print(f"Version: {document._meta.version}")

# Get specific fields only
document = es.get("blog", "post", "tutorial-001", fields=["title", "tags"])

# Get with routing
document = es.get("blog", "post", "tutorial-001", routing="python-category")

# Get from specific node preference
document = es.get("blog", "post", "tutorial-001", preference="_local")

Multi-Get Operations

def mget(self, ids, index=None, doc_type=None, **query_params):
    """
    Retrieve multiple documents by IDs.
    
    Args:
        ids (list): List of document IDs or dicts with index/type/id
        index (str, optional): Default index name
        doc_type (str, optional): Default document type
        **query_params: Additional parameters
        
    Returns:
        List of documents (None for missing documents)
    """
    pass

# Get multiple documents from same index/type
docs = es.mget(["tutorial-001", "tutorial-002", "tutorial-003"], 
               index="blog", doc_type="post")

# Get documents from different indices/types
requests = [
    {"_index": "blog", "_type": "post", "_id": "tutorial-001"},
    {"_index": "news", "_type": "article", "_id": "news-001"},
    {"_index": "blog", "_type": "comment", "_id": "comment-001"}
]
docs = es.mget(requests)

# Process results
for doc in docs:
    if doc is not None:
        print(f"Found: {doc.title}")
    else:
        print("Document not found")

Update Documents

def update(self, index, doc_type, id, script=None, lang="mvel", 
           params=None, document=None, upsert=None, **kwargs):
    """
    Update a document using script or partial document.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str): Document ID
        script (str, optional): Update script
        lang (str): Script language. Default: "mvel"
        params (dict, optional): Script parameters
        document (dict, optional): Partial document for update
        upsert (dict, optional): Document to create if not exists
        **kwargs: Additional parameters (routing, refresh, etc.)
        
    Returns:
        Update result information
        
    Raises:
        DocumentMissingException: If document not found and no upsert
        VersionConflictEngineException: If version conflict occurs
    """
    pass

def partial_update(self, index, doc_type, id, doc=None, script=None, 
                   params=None, **kwargs):
    """
    Partial update of a document.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str): Document ID
        doc (dict, optional): Partial document fields
        script (str, optional): Update script
        params (dict, optional): Script parameters
        **kwargs: Additional parameters
        
    Returns:
        Update result information
    """
    pass

# Script-based update
es.update("blog", "post", "tutorial-001",
          script="ctx._source.view_count += params.increment",
          params={"increment": 1})

# Partial document update  
es.partial_update("blog", "post", "tutorial-001", 
                  doc={"tags": ["python", "elasticsearch", "tutorial", "updated"]})

# Update with upsert (create if doesn't exist)
es.update("blog", "post", "new-tutorial",
          document={"title": "New Tutorial", "content": "Content here"},
          upsert={"title": "Default Title", "created_date": "2023-12-01"})

# Conditional update with version
es.update("blog", "post", "tutorial-001",
          document={"status": "published"},
          version=2)  # Only update if current version is 2

Delete Documents

def delete(self, index, doc_type, id, bulk=False, **query_params):
    """
    Delete a document by ID.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str): Document ID
        bulk (bool): Add to bulk buffer. Default: False
        **query_params: Additional parameters (routing, refresh, etc.)
        
    Returns:
        Deletion result information
        
    Raises:
        DocumentMissingException: If document not found
    """
    pass

def exists(self, index, doc_type, id, **query_params):
    """
    Check if document exists.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str): Document ID
        **query_params: Additional parameters
        
    Returns:
        bool: True if document exists, False otherwise
    """
    pass

# Delete document
es.delete("blog", "post", "tutorial-001")

# Check if document exists before deletion
if es.exists("blog", "post", "tutorial-001"):
    es.delete("blog", "post", "tutorial-001")
    print("Document deleted")
else:
    print("Document not found")

# Delete with routing
es.delete("blog", "post", "tutorial-001", routing="python-category")

# Bulk deletion (added to bulk buffer)
es.delete("blog", "post", "tutorial-001", bulk=True)
es.delete("blog", "post", "tutorial-002", bulk=True)
es.flush_bulk()  # Process all deletions

Search Operations

Basic Search

def search(self, query, indices=None, doc_types=None, model=None, 
           scan=False, headers=None, **query_params):
    """
    Execute a search query.
    
    Args:
        query (Query|dict): Query object or raw query dict
        indices (list, optional): Indices to search. Uses default_indices if None
        doc_types (list, optional): Document types to search
        model (class, optional): Custom model class for results
        scan (bool): Use scan search for large result sets. Default: False
        headers (dict, optional): Custom HTTP headers
        **query_params: Additional parameters (routing, preference, etc.)
        
    Returns:
        Search results with hits, facets, and metadata
    """
    pass

def search_raw(self, query, indices=None, doc_types=None, 
               headers=None, **query_params):
    """
    Execute search and return raw dictionary result.
    
    Args:
        query (Query|dict): Query object or raw query dict
        indices (list, optional): Indices to search
        doc_types (list, optional): Document types to search
        headers (dict, optional): Custom HTTP headers
        **query_params: Additional parameters
        
    Returns:
        dict: Raw ElasticSearch response
    """
    pass

from pyes import Search, TermQuery, BoolQuery, RangeQuery

# Simple term search
query = Search(TermQuery("tags", "python"))
results = es.search(query, indices=["blog"])

# Process results
print(f"Total hits: {results.total}")
for hit in results:
    print(f"Title: {hit.title}")
    print(f"Score: {hit._meta.score}")
    print(f"Index: {hit._meta.index}")

# Complex boolean search
complex_query = Search(
    BoolQuery(
        must=[TermQuery("status", "published")],
        should=[
            TermQuery("tags", "python"),
            TermQuery("tags", "elasticsearch")
        ],
        must_not=[TermQuery("category", "draft")],
        filter=RangeQuery("published_date", gte="2023-01-01")
    )
).size(20).sort("published_date", order="desc")

results = es.search(complex_query, indices=["blog", "news"])

# Raw search for custom processing
raw_query = {
    "query": {"match": {"title": "python"}},
    "highlight": {"fields": {"title": {}, "content": {}}}
}
raw_results = es.search_raw(raw_query, indices=["blog"])

Multi-Search

def search_multi(self, queries, indices_list=None, doc_types_list=None, **kwargs):
    """
    Execute multiple search queries in a single request.
    
    Args:
        queries (list): List of query objects or dicts
        indices_list (list, optional): List of indices for each query
        doc_types_list (list, optional): List of doc types for each query
        **kwargs: Additional parameters
        
    Returns:
        list: List of search results for each query
    """
    pass

# Multiple searches in single request
queries = [
    Search(TermQuery("tags", "python")),
    Search(TermQuery("tags", "javascript")), 
    Search(RangeQuery("view_count", gte=1000))
]

indices_list = [["blog"], ["blog"], ["blog", "news"]]

results = es.search_multi(queries, indices_list)

for i, result in enumerate(results):
    print(f"Query {i+1}: {result.total} hits")

Scroll Search for Large Results

def search_scroll(self, scroll_id, scroll="10m"):
    """
    Continue scrolling through search results.
    
    Args:
        scroll_id (str): Scroll ID from previous search
        scroll (str): Scroll timeout. Default: "10m"
        
    Returns:
        Next batch of search results
    """
    pass

# Initial search with scroll
query = Search(TermQuery("status", "published")).size(1000)
results = es.search(query, indices=["blog"], scroll="5m")

all_docs = list(results)  # First batch

# Continue scrolling for remaining results
while results.total > len(all_docs):
    results = es.search_scroll(results._scroll_id, scroll="5m")
    all_docs.extend(results)
    
    if len(results) == 0:  # No more results
        break

print(f"Retrieved {len(all_docs)} total documents")

# Scan search for memory-efficient large result processing  
query = Search(TermQuery("category", "products"))
results = es.search(query, indices=["catalog"], scan=True, scroll="2m", size=100)

for batch in results:
    for doc in batch:
        process_document(doc)  # Process each document

Count and Delete by Query

def count(self, query=None, indices=None, doc_types=None, **query_params):
    """
    Count documents matching query.
    
    Args:
        query (Query|dict, optional): Query to count. Counts all if None
        indices (list, optional): Indices to search
        doc_types (list, optional): Document types to search
        **query_params: Additional parameters
        
    Returns:
        int: Number of matching documents
    """
    pass

def delete_by_query(self, indices, doc_types, query, **query_params):
    """
    Delete documents matching query.
    
    Args:
        indices (list): Indices to delete from
        doc_types (list): Document types to delete from
        query (Query|dict): Query to match documents for deletion
        **query_params: Additional parameters
        
    Returns:
        Deletion result information
    """
    pass

# Count all documents
total_docs = es.count(indices=["blog"])

# Count with query
python_posts = es.count(TermQuery("tags", "python"), indices=["blog"])

# Delete old documents
old_query = RangeQuery("published_date", lt="2020-01-01")
deletion_result = es.delete_by_query(["blog"], ["post"], old_query)
print(f"Deleted {deletion_result.total} old posts")

Suggestion Operations

Auto-completion and Suggestions

def suggest(self, name, text, field, type='term', size=None, params=None, **kwargs):
    """
    Get suggestions for text.
    
    Args:
        name (str): Suggestion name
        text (str): Text to get suggestions for
        field (str): Field to suggest on
        type (str): Suggestion type ('term', 'phrase', 'completion'). Default: 'term'
        size (int, optional): Number of suggestions to return
        params (dict, optional): Additional suggestion parameters
        **kwargs: Additional parameters (indices, etc.)
        
    Returns:
        Suggestion results
    """
    pass

def suggest_from_object(self, suggest, indices=None, preference=None, **kwargs):
    """
    Get suggestions from Suggest object.
    
    Args:
        suggest (Suggest): Suggest object with configured suggestions
        indices (list, optional): Indices to suggest from
        preference (str, optional): Node preference
        **kwargs: Additional parameters
        
    Returns:
        Suggestion results
    """
    pass

from pyes import Suggest

# Term suggestions for typos
suggestions = es.suggest("title_suggest", "pythno", "title", type="term")

# Phrase suggestions 
phrase_suggestions = es.suggest("content_suggest", "elsticsearch", "content", 
                                type="phrase", size=3)

# Multiple suggestions using Suggest object
suggest = Suggest()
suggest.add_term("python tutorial", "title_suggest", "title")
suggest.add_phrase("elasticsearch guide", "content_suggest", "content") 
suggest.add_completion("py", "tag_suggest", "tags.suggest")

all_suggestions = es.suggest_from_object(suggest, indices=["blog"])

# Process suggestions
for suggestion_name, suggestion_results in all_suggestions.items():
    print(f"Suggestions for {suggestion_name}:")
    for option in suggestion_results[0].options:
        print(f"  - {option.text} (score: {option.score})")

File Operations

File Indexing and Retrieval

def put_file(self, filename, index, doc_type, id=None, name=None):
    """
    Index a file as attachment.
    
    Args:
        filename (str): Path to file to index
        index (str): Index name
        doc_type (str): Document type
        id (str, optional): Document ID. Auto-generated if None
        name (str, optional): Name for the attachment
        
    Returns:
        str: Document ID
    """
    pass

def get_file(self, index, doc_type, id=None):
    """
    Retrieve an indexed file.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str, optional): Document ID
        
    Returns:
        File content and metadata
    """
    pass

# Index PDF file
doc_id = es.put_file("/path/to/document.pdf", "documents", "attachment", 
                     name="Important Document")

# Index with metadata
import os
from pyes import file_to_attachment

with open("/path/to/document.pdf", "rb") as f:
    attachment = file_to_attachment(f.read(), "document.pdf")
    
doc = {
    "title": "Important Document",
    "uploaded_by": "john.doe",
    "upload_date": "2023-12-01",
    "file": attachment
}

es.index(doc, "documents", "attachment", id="doc-001")

# Retrieve file
file_doc = es.get_file("documents", "attachment", "doc-001")
print(f"File name: {file_doc.file.title}")
print(f"File size: {len(file_doc.file.content)} bytes")

Percolator Operations

Query Registration and Matching

def create_percolator(self, index, name, query, **kwargs):
    """
    Create a percolator query.
    
    Args:
        index (str): Index name
        name (str): Percolator name
        query (Query|dict): Query to register
        **kwargs: Additional parameters
        
    Returns:
        Creation result
    """
    pass

def delete_percolator(self, index, name):
    """
    Delete a percolator query.
    
    Args:
        index (str): Index name  
        name (str): Percolator name
        
    Returns:
        Deletion result
    """
    pass

def percolate(self, index, doc_types, query):
    """
    Test document against registered percolator queries.
    
    Args:
        index (str): Index name
        doc_types (list): Document types
        query (dict): Document to test
        
    Returns:
        Matching percolator queries
    """
    pass

# Register percolator queries for content filtering
python_query = TermQuery("tags", "python")
es.create_percolator("blog", "python_posts", python_query)

tutorial_query = BoolQuery(
    must=[TermQuery("category", "tutorial")],
    should=[TermQuery("difficulty", "beginner")]
)
es.create_percolator("blog", "beginner_tutorials", tutorial_query)

# Test document against percolators
test_doc = {
    "title": "Python Basics Tutorial",
    "tags": ["python", "programming"],
    "category": "tutorial",
    "difficulty": "beginner"
}

matches = es.percolate("blog", ["post"], {"doc": test_doc})
print(f"Matching queries: {[match.id for match in matches]}")
# Output: ['python_posts', 'beginner_tutorials']

More Like This

Similar Document Discovery

def morelikethis(self, index, doc_type, id, fields, **query_params):
    """
    Find documents similar to the specified document.
    
    Args:
        index (str): Index name
        doc_type (str): Document type
        id (str): Document ID to find similar documents for
        fields (list): Fields to use for similarity calculation
        **query_params: Additional MLT parameters (min_term_freq, max_query_terms, etc.)
        
    Returns:
        Similar documents
    """
    pass

# Find similar blog posts
similar_posts = es.morelikethis(
    "blog", "post", "tutorial-001",
    fields=["title", "content", "tags"],
    min_term_freq=1,
    max_query_terms=12,
    min_doc_freq=1,
    stop_words=["the", "and", "or", "but"]
)

print(f"Found {similar_posts.total} similar posts:")
for post in similar_posts:
    print(f"  - {post.title} (score: {post._meta.score})")

Properties and Configuration

Dynamic Properties

@property
def mappings(self):
    """
    Get Mapper instance for mapping management.
    
    Returns:
        Mapper: Mapping management instance
    """
    pass

@property  
def default_indices(self):
    """
    Get default indices for operations.
    
    Returns:
        list: Default indices
    """
    pass

@default_indices.setter
def default_indices(self, indices):
    """
    Set default indices for operations.
    
    Args:
        indices (list): Default indices to use
    """
    pass

@property
def bulk_size(self):
    """
    Get current bulk operation size.
    
    Returns:
        int: Current bulk size
    """
    pass

@bulk_size.setter  
def bulk_size(self, size):
    """
    Set bulk operation size.
    
    Args:
        size (int): New bulk size
    """
    pass

# Configure default behavior
es.default_indices = ["blog", "news"]  # Default to these indices
es.bulk_size = 1000  # Process 1000 operations per bulk

# Access mapping management
mapping = es.mappings
mapping.create_index_if_missing("new_index")

# Search using default indices (no need to specify)
results = es.search(TermQuery("status", "published"))  # Uses default_indices

Error Handling

Exception Management

from pyes import (
    ElasticSearchException, IndexMissingException, 
    DocumentMissingException, DocumentAlreadyExistsException,
    VersionConflictEngineException, BulkOperationException,
    NoServerAvailable
)

try:
    # Document operations with error handling
    doc_id = es.index(document, "blog", "post", id="existing-id")
    
except DocumentAlreadyExistsException:
    # Handle duplicate document
    print("Document already exists, updating instead")
    es.update("blog", "post", "existing-id", document=document)
    
except IndexMissingException:
    # Create index and retry
    print("Index missing, creating index")
    es.indices.create_index("blog")
    doc_id = es.index(document, "blog", "post", id="existing-id")
    
except VersionConflictEngineException as e:
    # Handle version conflicts
    print(f"Version conflict: {e}")
    current_doc = es.get("blog", "post", "existing-id")
    # Resolve conflict and retry with current version
    
except NoServerAvailable:
    # Handle connection failures
    print("No ElasticSearch servers available")
    # Implement fallback or retry logic
    
except ElasticSearchException as e:
    # Handle general ES exceptions
    print(f"ElasticSearch error: {e}")

Best Practices

Performance Optimization

# Connection pooling for concurrent applications
import threading

class ESConnectionPool:
    def __init__(self, servers, pool_size=10):
        self.servers = servers
        self.pool = []
        self.lock = threading.Lock()
        
        for _ in range(pool_size):
            es = ES(servers, timeout=30, max_retries=3)
            self.pool.append(es)
    
    def get_connection(self):
        with self.lock:
            if self.pool:
                return self.pool.pop()
            else:
                return ES(self.servers)
    
    def return_connection(self, es):
        with self.lock:
            self.pool.append(es)

# Use connection pool
pool = ESConnectionPool(['server1:9200', 'server2:9200'])

def worker_function():
    es = pool.get_connection()
    try:
        # Perform operations
        results = es.search(query, indices=["data"])
        # Process results
    finally:
        pool.return_connection(es)

Bulk Processing Patterns

# Efficient bulk processing with error handling
def bulk_index_documents(es, documents, index, doc_type):
    """Efficiently index large numbers of documents."""
    
    es.bulk_size = 1000  # Optimize batch size
    failed_docs = []
    
    try:
        for i, doc in enumerate(documents):
            try:
                es.index(doc, index, doc_type, bulk=True)
                
                # Force flush every 10,000 documents
                if i % 10000 == 0:
                    es.flush_bulk()
                    
            except Exception as e:
                failed_docs.append((i, doc, str(e)))
                
        # Final flush
        es.flush_bulk()
        
    except BulkOperationException as e:
        # Handle bulk operation failures
        for failure in e.errors:
            print(f"Bulk failure: {failure}")
            
    return failed_docs

# Usage
documents = [{"title": f"Doc {i}", "content": f"Content {i}"} for i in range(50000)]
failures = bulk_index_documents(es, documents, "bulk_index", "document")

if failures:
    print(f"Failed to index {len(failures)} documents")
    # Implement retry logic for failures

The ES client provides comprehensive functionality for all ElasticSearch operations with robust error handling, flexible configuration options, and performance optimization features.

Install with Tessl CLI

npx tessl i tessl/pypi-pyes

docs

bulk-operations.md

client.md

facets-aggregations.md

filters.md

index.md

mappings.md

query-dsl.md

rivers.md

tile.json