Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters
—
The ES class is the main entry point for all ElasticSearch operations in PyES. It manages connections, provides document operations, search functionality, index management, and cluster administration.
class ES:
"""
Main ElasticSearch client class providing connection management
and all ElasticSearch operations.
Args:
server (str|list): ElasticSearch server(s). Default: "localhost:9200"
timeout (float): Request timeout in seconds. Default: 30.0
bulk_size (int): Number of operations per bulk request. Default: 400
encoder (callable): Custom JSON encoder. Default: None
decoder (callable): Custom JSON decoder. Default: None
max_retries (int): Maximum retry attempts. Default: 3
retry_time (int): Retry delay in seconds. Default: 60
default_indices (list): Default indices for operations. Default: None
default_types (list): Default document types. Default: None
log_curl (bool): Log curl commands. Default: False
dump_curl (bool): Dump curl to file. Default: False
model (class): Document model class. Default: ElasticSearchModel
basic_auth (tuple): (username, password) for authentication. Default: None
raise_on_bulk_item_failure (bool): Raise on bulk item failures. Default: False
document_object_field (str): Document object field name. Default: None
bulker_class (class): Bulk operations class. Default: ListBulker
cert_reqs (str): SSL certificate requirements. Default: 'CERT_OPTIONAL'
"""
def __init__(self, server="localhost:9200", timeout=30.0, bulk_size=400,
encoder=None, decoder=None, max_retries=3, retry_time=60,
default_indices=None, default_types=None, log_curl=False,
dump_curl=False, model=ElasticSearchModel, basic_auth=None,
raise_on_bulk_item_failure=False, document_object_field=None,
bulker_class=ListBulker, cert_reqs='CERT_OPTIONAL'):
passfrom pyes import ES
# Single server connection
es = ES('localhost:9200')
# Multiple servers for failover
es = ES(['server1:9200', 'server2:9200', 'server3:9200'])
# With authentication
es = ES('localhost:9200', basic_auth=('username', 'password'))
# With SSL configuration
es = ES('https://secure-es.example.com:9200', cert_reqs='CERT_REQUIRED')# Advanced connection configuration
es = ES(
server="localhost:9200",
timeout=45.0, # 45 second timeout
max_retries=5, # Retry failed requests 5 times
retry_time=30, # Wait 30s between retries
bulk_size=1000, # Process 1000 operations per bulk
log_curl=True, # Log equivalent curl commands
raise_on_bulk_item_failure=True # Raise exceptions on bulk failures
)def index(self, doc, index, doc_type, id=None, parent=None,
force_insert=False, bulk=False, **kwargs):
"""
Index a document in ElasticSearch.
Args:
doc (dict): Document to index
index (str): Index name
doc_type (str): Document type
id (str, optional): Document ID. Auto-generated if None
parent (str, optional): Parent document ID
force_insert (bool): Use PUT instead of POST. Default: False
bulk (bool): Add to bulk buffer instead of immediate index. Default: False
**kwargs: Additional parameters (routing, refresh, etc.)
Returns:
str: Document ID if successful
Raises:
DocumentAlreadyExistsException: If document exists and force_insert=True
IndexMissingException: If index doesn't exist
"""
pass
# Basic document indexing
doc = {
"title": "Python ElasticSearch Tutorial",
"content": "Learn how to use PyES library effectively",
"tags": ["python", "elasticsearch", "tutorial"],
"published_date": "2023-12-01",
"author": "Jane Developer",
"view_count": 0
}
# Index with auto-generated ID
doc_id = es.index(doc, "blog", "post")
# Index with specific ID
es.index(doc, "blog", "post", id="tutorial-001")
# Index with parent-child relationship
child_doc = {"comment": "Great tutorial!", "author": "John Reader"}
es.index(child_doc, "blog", "comment", parent="tutorial-001")
# Index with routing for performance
es.index(doc, "blog", "post", routing="python-category")
# Force refresh immediately
es.index(doc, "blog", "post", refresh=True)def get(self, index, doc_type, id, fields=None, model=None, **query_params):
"""
Retrieve a document by ID.
Args:
index (str): Index name
doc_type (str): Document type
id (str): Document ID
fields (list, optional): Specific fields to retrieve
model (class, optional): Custom model class for result
**query_params: Additional parameters (routing, preference, etc.)
Returns:
Document object with metadata
Raises:
DocumentMissingException: If document not found
IndexMissingException: If index doesn't exist
"""
pass
# Get complete document
document = es.get("blog", "post", "tutorial-001")
print(f"Title: {document.title}")
print(f"Content: {document.content}")
print(f"Document ID: {document._meta.id}")
print(f"Version: {document._meta.version}")
# Get specific fields only
document = es.get("blog", "post", "tutorial-001", fields=["title", "tags"])
# Get with routing
document = es.get("blog", "post", "tutorial-001", routing="python-category")
# Get from specific node preference
document = es.get("blog", "post", "tutorial-001", preference="_local")def mget(self, ids, index=None, doc_type=None, **query_params):
"""
Retrieve multiple documents by IDs.
Args:
ids (list): List of document IDs or dicts with index/type/id
index (str, optional): Default index name
doc_type (str, optional): Default document type
**query_params: Additional parameters
Returns:
List of documents (None for missing documents)
"""
pass
# Get multiple documents from same index/type
docs = es.mget(["tutorial-001", "tutorial-002", "tutorial-003"],
index="blog", doc_type="post")
# Get documents from different indices/types
requests = [
{"_index": "blog", "_type": "post", "_id": "tutorial-001"},
{"_index": "news", "_type": "article", "_id": "news-001"},
{"_index": "blog", "_type": "comment", "_id": "comment-001"}
]
docs = es.mget(requests)
# Process results
for doc in docs:
if doc is not None:
print(f"Found: {doc.title}")
else:
print("Document not found")def update(self, index, doc_type, id, script=None, lang="mvel",
params=None, document=None, upsert=None, **kwargs):
"""
Update a document using script or partial document.
Args:
index (str): Index name
doc_type (str): Document type
id (str): Document ID
script (str, optional): Update script
lang (str): Script language. Default: "mvel"
params (dict, optional): Script parameters
document (dict, optional): Partial document for update
upsert (dict, optional): Document to create if not exists
**kwargs: Additional parameters (routing, refresh, etc.)
Returns:
Update result information
Raises:
DocumentMissingException: If document not found and no upsert
VersionConflictEngineException: If version conflict occurs
"""
pass
def partial_update(self, index, doc_type, id, doc=None, script=None,
params=None, **kwargs):
"""
Partial update of a document.
Args:
index (str): Index name
doc_type (str): Document type
id (str): Document ID
doc (dict, optional): Partial document fields
script (str, optional): Update script
params (dict, optional): Script parameters
**kwargs: Additional parameters
Returns:
Update result information
"""
pass
# Script-based update
es.update("blog", "post", "tutorial-001",
script="ctx._source.view_count += params.increment",
params={"increment": 1})
# Partial document update
es.partial_update("blog", "post", "tutorial-001",
doc={"tags": ["python", "elasticsearch", "tutorial", "updated"]})
# Update with upsert (create if doesn't exist)
es.update("blog", "post", "new-tutorial",
document={"title": "New Tutorial", "content": "Content here"},
upsert={"title": "Default Title", "created_date": "2023-12-01"})
# Conditional update with version
es.update("blog", "post", "tutorial-001",
document={"status": "published"},
version=2) # Only update if current version is 2def delete(self, index, doc_type, id, bulk=False, **query_params):
"""
Delete a document by ID.
Args:
index (str): Index name
doc_type (str): Document type
id (str): Document ID
bulk (bool): Add to bulk buffer. Default: False
**query_params: Additional parameters (routing, refresh, etc.)
Returns:
Deletion result information
Raises:
DocumentMissingException: If document not found
"""
pass
def exists(self, index, doc_type, id, **query_params):
"""
Check if document exists.
Args:
index (str): Index name
doc_type (str): Document type
id (str): Document ID
**query_params: Additional parameters
Returns:
bool: True if document exists, False otherwise
"""
pass
# Delete document
es.delete("blog", "post", "tutorial-001")
# Check if document exists before deletion
if es.exists("blog", "post", "tutorial-001"):
es.delete("blog", "post", "tutorial-001")
print("Document deleted")
else:
print("Document not found")
# Delete with routing
es.delete("blog", "post", "tutorial-001", routing="python-category")
# Bulk deletion (added to bulk buffer)
es.delete("blog", "post", "tutorial-001", bulk=True)
es.delete("blog", "post", "tutorial-002", bulk=True)
es.flush_bulk() # Process all deletionsdef search(self, query, indices=None, doc_types=None, model=None,
scan=False, headers=None, **query_params):
"""
Execute a search query.
Args:
query (Query|dict): Query object or raw query dict
indices (list, optional): Indices to search. Uses default_indices if None
doc_types (list, optional): Document types to search
model (class, optional): Custom model class for results
scan (bool): Use scan search for large result sets. Default: False
headers (dict, optional): Custom HTTP headers
**query_params: Additional parameters (routing, preference, etc.)
Returns:
Search results with hits, facets, and metadata
"""
pass
def search_raw(self, query, indices=None, doc_types=None,
headers=None, **query_params):
"""
Execute search and return raw dictionary result.
Args:
query (Query|dict): Query object or raw query dict
indices (list, optional): Indices to search
doc_types (list, optional): Document types to search
headers (dict, optional): Custom HTTP headers
**query_params: Additional parameters
Returns:
dict: Raw ElasticSearch response
"""
pass
from pyes import Search, TermQuery, BoolQuery, RangeQuery
# Simple term search
query = Search(TermQuery("tags", "python"))
results = es.search(query, indices=["blog"])
# Process results
print(f"Total hits: {results.total}")
for hit in results:
print(f"Title: {hit.title}")
print(f"Score: {hit._meta.score}")
print(f"Index: {hit._meta.index}")
# Complex boolean search
complex_query = Search(
BoolQuery(
must=[TermQuery("status", "published")],
should=[
TermQuery("tags", "python"),
TermQuery("tags", "elasticsearch")
],
must_not=[TermQuery("category", "draft")],
filter=RangeQuery("published_date", gte="2023-01-01")
)
).size(20).sort("published_date", order="desc")
results = es.search(complex_query, indices=["blog", "news"])
# Raw search for custom processing
raw_query = {
"query": {"match": {"title": "python"}},
"highlight": {"fields": {"title": {}, "content": {}}}
}
raw_results = es.search_raw(raw_query, indices=["blog"])def search_multi(self, queries, indices_list=None, doc_types_list=None, **kwargs):
"""
Execute multiple search queries in a single request.
Args:
queries (list): List of query objects or dicts
indices_list (list, optional): List of indices for each query
doc_types_list (list, optional): List of doc types for each query
**kwargs: Additional parameters
Returns:
list: List of search results for each query
"""
pass
# Multiple searches in single request
queries = [
Search(TermQuery("tags", "python")),
Search(TermQuery("tags", "javascript")),
Search(RangeQuery("view_count", gte=1000))
]
indices_list = [["blog"], ["blog"], ["blog", "news"]]
results = es.search_multi(queries, indices_list)
for i, result in enumerate(results):
print(f"Query {i+1}: {result.total} hits")def search_scroll(self, scroll_id, scroll="10m"):
"""
Continue scrolling through search results.
Args:
scroll_id (str): Scroll ID from previous search
scroll (str): Scroll timeout. Default: "10m"
Returns:
Next batch of search results
"""
pass
# Initial search with scroll
query = Search(TermQuery("status", "published")).size(1000)
results = es.search(query, indices=["blog"], scroll="5m")
all_docs = list(results) # First batch
# Continue scrolling for remaining results
while results.total > len(all_docs):
results = es.search_scroll(results._scroll_id, scroll="5m")
all_docs.extend(results)
if len(results) == 0: # No more results
break
print(f"Retrieved {len(all_docs)} total documents")
# Scan search for memory-efficient large result processing
query = Search(TermQuery("category", "products"))
results = es.search(query, indices=["catalog"], scan=True, scroll="2m", size=100)
for batch in results:
for doc in batch:
process_document(doc) # Process each documentdef count(self, query=None, indices=None, doc_types=None, **query_params):
"""
Count documents matching query.
Args:
query (Query|dict, optional): Query to count. Counts all if None
indices (list, optional): Indices to search
doc_types (list, optional): Document types to search
**query_params: Additional parameters
Returns:
int: Number of matching documents
"""
pass
def delete_by_query(self, indices, doc_types, query, **query_params):
"""
Delete documents matching query.
Args:
indices (list): Indices to delete from
doc_types (list): Document types to delete from
query (Query|dict): Query to match documents for deletion
**query_params: Additional parameters
Returns:
Deletion result information
"""
pass
# Count all documents
total_docs = es.count(indices=["blog"])
# Count with query
python_posts = es.count(TermQuery("tags", "python"), indices=["blog"])
# Delete old documents
old_query = RangeQuery("published_date", lt="2020-01-01")
deletion_result = es.delete_by_query(["blog"], ["post"], old_query)
print(f"Deleted {deletion_result.total} old posts")def suggest(self, name, text, field, type='term', size=None, params=None, **kwargs):
"""
Get suggestions for text.
Args:
name (str): Suggestion name
text (str): Text to get suggestions for
field (str): Field to suggest on
type (str): Suggestion type ('term', 'phrase', 'completion'). Default: 'term'
size (int, optional): Number of suggestions to return
params (dict, optional): Additional suggestion parameters
**kwargs: Additional parameters (indices, etc.)
Returns:
Suggestion results
"""
pass
def suggest_from_object(self, suggest, indices=None, preference=None, **kwargs):
"""
Get suggestions from Suggest object.
Args:
suggest (Suggest): Suggest object with configured suggestions
indices (list, optional): Indices to suggest from
preference (str, optional): Node preference
**kwargs: Additional parameters
Returns:
Suggestion results
"""
pass
from pyes import Suggest
# Term suggestions for typos
suggestions = es.suggest("title_suggest", "pythno", "title", type="term")
# Phrase suggestions
phrase_suggestions = es.suggest("content_suggest", "elsticsearch", "content",
type="phrase", size=3)
# Multiple suggestions using Suggest object
suggest = Suggest()
suggest.add_term("python tutorial", "title_suggest", "title")
suggest.add_phrase("elasticsearch guide", "content_suggest", "content")
suggest.add_completion("py", "tag_suggest", "tags.suggest")
all_suggestions = es.suggest_from_object(suggest, indices=["blog"])
# Process suggestions
for suggestion_name, suggestion_results in all_suggestions.items():
print(f"Suggestions for {suggestion_name}:")
for option in suggestion_results[0].options:
print(f" - {option.text} (score: {option.score})")def put_file(self, filename, index, doc_type, id=None, name=None):
"""
Index a file as attachment.
Args:
filename (str): Path to file to index
index (str): Index name
doc_type (str): Document type
id (str, optional): Document ID. Auto-generated if None
name (str, optional): Name for the attachment
Returns:
str: Document ID
"""
pass
def get_file(self, index, doc_type, id=None):
"""
Retrieve an indexed file.
Args:
index (str): Index name
doc_type (str): Document type
id (str, optional): Document ID
Returns:
File content and metadata
"""
pass
# Index PDF file
doc_id = es.put_file("/path/to/document.pdf", "documents", "attachment",
name="Important Document")
# Index with metadata
import os
from pyes import file_to_attachment
with open("/path/to/document.pdf", "rb") as f:
attachment = file_to_attachment(f.read(), "document.pdf")
doc = {
"title": "Important Document",
"uploaded_by": "john.doe",
"upload_date": "2023-12-01",
"file": attachment
}
es.index(doc, "documents", "attachment", id="doc-001")
# Retrieve file
file_doc = es.get_file("documents", "attachment", "doc-001")
print(f"File name: {file_doc.file.title}")
print(f"File size: {len(file_doc.file.content)} bytes")def create_percolator(self, index, name, query, **kwargs):
"""
Create a percolator query.
Args:
index (str): Index name
name (str): Percolator name
query (Query|dict): Query to register
**kwargs: Additional parameters
Returns:
Creation result
"""
pass
def delete_percolator(self, index, name):
"""
Delete a percolator query.
Args:
index (str): Index name
name (str): Percolator name
Returns:
Deletion result
"""
pass
def percolate(self, index, doc_types, query):
"""
Test document against registered percolator queries.
Args:
index (str): Index name
doc_types (list): Document types
query (dict): Document to test
Returns:
Matching percolator queries
"""
pass
# Register percolator queries for content filtering
python_query = TermQuery("tags", "python")
es.create_percolator("blog", "python_posts", python_query)
tutorial_query = BoolQuery(
must=[TermQuery("category", "tutorial")],
should=[TermQuery("difficulty", "beginner")]
)
es.create_percolator("blog", "beginner_tutorials", tutorial_query)
# Test document against percolators
test_doc = {
"title": "Python Basics Tutorial",
"tags": ["python", "programming"],
"category": "tutorial",
"difficulty": "beginner"
}
matches = es.percolate("blog", ["post"], {"doc": test_doc})
print(f"Matching queries: {[match.id for match in matches]}")
# Output: ['python_posts', 'beginner_tutorials']def morelikethis(self, index, doc_type, id, fields, **query_params):
"""
Find documents similar to the specified document.
Args:
index (str): Index name
doc_type (str): Document type
id (str): Document ID to find similar documents for
fields (list): Fields to use for similarity calculation
**query_params: Additional MLT parameters (min_term_freq, max_query_terms, etc.)
Returns:
Similar documents
"""
pass
# Find similar blog posts
similar_posts = es.morelikethis(
"blog", "post", "tutorial-001",
fields=["title", "content", "tags"],
min_term_freq=1,
max_query_terms=12,
min_doc_freq=1,
stop_words=["the", "and", "or", "but"]
)
print(f"Found {similar_posts.total} similar posts:")
for post in similar_posts:
print(f" - {post.title} (score: {post._meta.score})")@property
def mappings(self):
"""
Get Mapper instance for mapping management.
Returns:
Mapper: Mapping management instance
"""
pass
@property
def default_indices(self):
"""
Get default indices for operations.
Returns:
list: Default indices
"""
pass
@default_indices.setter
def default_indices(self, indices):
"""
Set default indices for operations.
Args:
indices (list): Default indices to use
"""
pass
@property
def bulk_size(self):
"""
Get current bulk operation size.
Returns:
int: Current bulk size
"""
pass
@bulk_size.setter
def bulk_size(self, size):
"""
Set bulk operation size.
Args:
size (int): New bulk size
"""
pass
# Configure default behavior
es.default_indices = ["blog", "news"] # Default to these indices
es.bulk_size = 1000 # Process 1000 operations per bulk
# Access mapping management
mapping = es.mappings
mapping.create_index_if_missing("new_index")
# Search using default indices (no need to specify)
results = es.search(TermQuery("status", "published")) # Uses default_indicesfrom pyes import (
ElasticSearchException, IndexMissingException,
DocumentMissingException, DocumentAlreadyExistsException,
VersionConflictEngineException, BulkOperationException,
NoServerAvailable
)
try:
# Document operations with error handling
doc_id = es.index(document, "blog", "post", id="existing-id")
except DocumentAlreadyExistsException:
# Handle duplicate document
print("Document already exists, updating instead")
es.update("blog", "post", "existing-id", document=document)
except IndexMissingException:
# Create index and retry
print("Index missing, creating index")
es.indices.create_index("blog")
doc_id = es.index(document, "blog", "post", id="existing-id")
except VersionConflictEngineException as e:
# Handle version conflicts
print(f"Version conflict: {e}")
current_doc = es.get("blog", "post", "existing-id")
# Resolve conflict and retry with current version
except NoServerAvailable:
# Handle connection failures
print("No ElasticSearch servers available")
# Implement fallback or retry logic
except ElasticSearchException as e:
# Handle general ES exceptions
print(f"ElasticSearch error: {e}")# Connection pooling for concurrent applications
import threading
class ESConnectionPool:
def __init__(self, servers, pool_size=10):
self.servers = servers
self.pool = []
self.lock = threading.Lock()
for _ in range(pool_size):
es = ES(servers, timeout=30, max_retries=3)
self.pool.append(es)
def get_connection(self):
with self.lock:
if self.pool:
return self.pool.pop()
else:
return ES(self.servers)
def return_connection(self, es):
with self.lock:
self.pool.append(es)
# Use connection pool
pool = ESConnectionPool(['server1:9200', 'server2:9200'])
def worker_function():
es = pool.get_connection()
try:
# Perform operations
results = es.search(query, indices=["data"])
# Process results
finally:
pool.return_connection(es)# Efficient bulk processing with error handling
def bulk_index_documents(es, documents, index, doc_type):
"""Efficiently index large numbers of documents."""
es.bulk_size = 1000 # Optimize batch size
failed_docs = []
try:
for i, doc in enumerate(documents):
try:
es.index(doc, index, doc_type, bulk=True)
# Force flush every 10,000 documents
if i % 10000 == 0:
es.flush_bulk()
except Exception as e:
failed_docs.append((i, doc, str(e)))
# Final flush
es.flush_bulk()
except BulkOperationException as e:
# Handle bulk operation failures
for failure in e.errors:
print(f"Bulk failure: {failure}")
return failed_docs
# Usage
documents = [{"title": f"Doc {i}", "content": f"Content {i}"} for i in range(50000)]
failures = bulk_index_documents(es, documents, "bulk_index", "document")
if failures:
print(f"Failed to index {len(failures)} documents")
# Implement retry logic for failuresThe ES client provides comprehensive functionality for all ElasticSearch operations with robust error handling, flexible configuration options, and performance optimization features.
Install with Tessl CLI
npx tessl i tessl/pypi-pyes