Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
Utility functions for common Elasticsearch operations including bulk indexing, scanning large result sets, and reindexing data. These helpers simplify complex operations and provide optimized implementations for common use cases.
Efficient bulk indexing with automatic batching, error handling, and progress tracking.
def bulk(
client,
actions,
index: Optional[str] = None,
doc_type: Optional[str] = None,
routing: Optional[str] = None,
pipeline: Optional[str] = None,
refresh: Optional[str] = None,
timeout: Optional[str] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 104857600,
thread_count: int = 4,
queue_size: int = 4,
expand_action_callback=None,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status=(),
**kwargs
) -> Tuple[int, List[Dict]]:
"""
Perform bulk indexing operations.
Parameters:
- client: Elasticsearch client instance
- actions: Iterable of action dictionaries or documents
- index: Default index name for actions
- doc_type: Default document type (deprecated)
- routing: Default routing value
- pipeline: Default ingest pipeline
- refresh: Refresh policy for operations
- timeout: Request timeout
- chunk_size: Number of documents per chunk
- max_chunk_bytes: Maximum chunk size in bytes
- thread_count: Number of parallel threads
- queue_size: Thread pool queue size
- expand_action_callback: Callback to expand actions
- raise_on_exception: Whether to raise on exceptions
- raise_on_error: Whether to raise on API errors
- ignore_status: HTTP status codes to ignore
Returns:
Tuple of (success_count, failed_operations)
"""
def streaming_bulk(
client,
actions,
index: Optional[str] = None,
doc_type: Optional[str] = None,
routing: Optional[str] = None,
pipeline: Optional[str] = None,
refresh: Optional[str] = None,
timeout: Optional[str] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 104857600,
expand_action_callback=None,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status=(),
**kwargs
):
"""
Generator that yields bulk operation results as they complete.
Parameters: Same as bulk()
Yields:
Tuples of (success, info) for each chunk processed
"""
def parallel_bulk(
client,
actions,
index: Optional[str] = None,
doc_type: Optional[str] = None,
routing: Optional[str] = None,
pipeline: Optional[str] = None,
refresh: Optional[str] = None,
timeout: Optional[str] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 104857600,
thread_count: int = 4,
queue_size: int = 4,
expand_action_callback=None,
ignore_status=(),
**kwargs
):
"""
Parallel bulk indexing using multiple threads.
Parameters: Same as bulk() with additional thread control
Yields:
Tuples of (success, info) for each chunk processed
"""Efficiently iterate through large result sets using scroll API.
def scan(
client,
query: Optional[Dict[str, Any]] = None,
scroll: str = "5m",
raise_on_error: bool = True,
preserve_order: bool = False,
size: int = 1000,
request_timeout: Optional[float] = None,
clear_scroll: bool = True,
scroll_kwargs: Optional[Dict] = None,
**kwargs
):
"""
Scan and scroll through all documents matching a query.
Parameters:
- client: Elasticsearch client instance
- query: Query to execute (default: match_all)
- scroll: Scroll context timeout
- raise_on_error: Whether to raise on errors
- preserve_order: Whether to preserve result ordering
- size: Number of documents per shard per batch
- request_timeout: Request timeout
- clear_scroll: Whether to clear scroll context when done
- scroll_kwargs: Additional arguments for scroll requests
- **kwargs: Additional search parameters
Yields:
Individual document hits
"""Copy documents between indices with optional transformation.
def reindex(
client,
source_index: str,
target_index: str,
query: Optional[Dict[str, Any]] = None,
target_client: Optional[object] = None,
chunk_size: int = 500,
scroll: str = "5m",
scan_kwargs: Optional[Dict] = None,
bulk_kwargs: Optional[Dict] = None,
transform_doc_callback=None,
**kwargs
) -> Tuple[int, List[Dict]]:
"""
Reindex documents from source to target index.
Parameters:
- client: Source Elasticsearch client
- source_index: Source index name
- target_index: Target index name
- query: Query to filter source documents
- target_client: Target client (if different from source)
- chunk_size: Bulk operation chunk size
- scroll: Scroll timeout for scanning
- scan_kwargs: Additional scan arguments
- bulk_kwargs: Additional bulk arguments
- transform_doc_callback: Function to transform documents
Returns:
Tuple of (success_count, failed_operations)
"""Async versions of helper functions for use with AsyncElasticsearch.
async def async_bulk(
client,
actions,
index: Optional[str] = None,
doc_type: Optional[str] = None,
routing: Optional[str] = None,
pipeline: Optional[str] = None,
refresh: Optional[str] = None,
timeout: Optional[str] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 104857600,
expand_action_callback=None,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status=(),
**kwargs
) -> Tuple[int, List[Dict]]:
"""
Async version of bulk operation.
Parameters: Same as bulk()
Returns:
Tuple of (success_count, failed_operations)
"""
async def async_streaming_bulk(
client,
actions,
index: Optional[str] = None,
doc_type: Optional[str] = None,
routing: Optional[str] = None,
pipeline: Optional[str] = None,
refresh: Optional[str] = None,
timeout: Optional[str] = None,
chunk_size: int = 500,
max_chunk_bytes: int = 104857600,
expand_action_callback=None,
raise_on_exception: bool = True,
raise_on_error: bool = True,
ignore_status=(),
**kwargs
):
"""
Async generator for streaming bulk operations.
Parameters: Same as streaming_bulk()
Yields:
Tuples of (success, info) for each chunk processed
"""
async def async_scan(
client,
query: Optional[Dict[str, Any]] = None,
scroll: str = "5m",
raise_on_error: bool = True,
preserve_order: bool = False,
size: int = 1000,
request_timeout: Optional[float] = None,
clear_scroll: bool = True,
scroll_kwargs: Optional[Dict] = None,
**kwargs
):
"""
Async version of scan operation.
Parameters: Same as scan()
Yields:
Individual document hits
"""
async def async_reindex(
client,
source_index: str,
target_index: str,
query: Optional[Dict[str, Any]] = None,
target_client: Optional[object] = None,
chunk_size: int = 500,
scroll: str = "5m",
scan_kwargs: Optional[Dict] = None,
bulk_kwargs: Optional[Dict] = None,
transform_doc_callback=None,
**kwargs
) -> Tuple[int, List[Dict]]:
"""
Async version of reindex operation.
Parameters: Same as reindex()
Returns:
Tuple of (success_count, failed_operations)
"""Additional utility functions for common operations.
def expand_action(action: Dict[str, Any]) -> Dict[str, Any]:
"""
Expand a shorthand action dictionary to full format.
Parameters:
- action: Action dictionary to expand
Returns:
Expanded action dictionary
"""Exception types specific to helper operations.
class BulkIndexError(Exception):
"""
Exception raised when bulk operations encounter errors.
Attributes:
- errors: List of individual operation errors
"""
def __init__(self, message: str, errors: List[Dict]): ...
@property
def errors(self) -> List[Dict]: ...
class ScanError(Exception):
"""
Exception raised during scan operations.
Attributes:
- scroll_id: Scroll context ID if available
- partial_results: Results obtained before error
"""
def __init__(self, message: str, scroll_id: Optional[str] = None): ...from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
client = Elasticsearch(hosts=['http://localhost:9200'])
# Prepare documents for bulk indexing
def generate_docs():
for i in range(10000):
yield {
"_index": "products",
"_id": i,
"_source": {
"name": f"Product {i}",
"price": i * 10.99,
"category": f"Category {i % 5}"
}
}
try:
# Bulk index documents
success_count, failed_ops = bulk(
client,
generate_docs(),
chunk_size=1000,
timeout='60s',
refresh='wait_for'
)
print(f"Successfully indexed: {success_count} documents")
if failed_ops:
print(f"Failed operations: {len(failed_ops)}")
except BulkIndexError as e:
print(f"Bulk indexing failed: {e}")
for error in e.errors:
print(f"Error: {error}")from elasticsearch.helpers import streaming_bulk
def generate_large_dataset():
for i in range(100000):
yield {
"_index": "logs",
"_source": {
"timestamp": f"2024-01-01T{i:02d}:00:00Z",
"level": "INFO" if i % 2 == 0 else "WARN",
"message": f"Log message {i}",
"service": f"service-{i % 10}"
}
}
# Stream bulk operations with progress tracking
total_indexed = 0
for success, info in streaming_bulk(
client,
generate_large_dataset(),
chunk_size=5000,
max_chunk_bytes=10 * 1024 * 1024 # 10MB chunks
):
if success:
total_indexed += 1
if total_indexed % 10 == 0: # Progress every 10 chunks
print(f"Indexed {total_indexed * 5000} documents...")
else:
print(f"Failed chunk: {info}")
print(f"Total documents indexed: {total_indexed * 5000}")from elasticsearch.helpers import scan
# Scan through all documents in an index
total_docs = 0
for doc in scan(
client,
index="products",
query={"match_all": {}},
scroll='2m',
size=1000
):
# Process each document
total_docs += 1
product_name = doc['_source']['name']
product_price = doc['_source']['price']
# Example: Update price with 10% discount
if product_price > 100:
client.update(
index="products",
id=doc['_id'],
document={"price": product_price * 0.9}
)
print(f"Processed {total_docs} documents")# Scan with complex query and field filtering
query = {
"bool": {
"must": [
{"range": {"price": {"gte": 50, "lte": 1000}}},
{"term": {"status": "active"}}
],
"must_not": [
{"term": {"category.keyword": "discontinued"}}
]
}
}
for doc in scan(
client,
index="products",
query=query,
_source=["name", "price", "category"], # Only retrieve specific fields
scroll='5m',
size=2000,
preserve_order=True
):
print(f"Product: {doc['_source']['name']}, Price: {doc['_source']['price']}")from elasticsearch.helpers import reindex
def transform_document(doc):
"""Transform documents during reindex."""
source = doc['_source']
# Add new computed field
source['price_tier'] = 'high' if source['price'] > 1000 else 'low'
# Rename field
if 'desc' in source:
source['description'] = source.pop('desc')
# Convert price to integer cents
source['price_cents'] = int(source['price'] * 100)
return doc
# Reindex with transformation
success_count, failed_ops = reindex(
client,
source_index="products_v1",
target_index="products_v2",
query={"term": {"status": "active"}}, # Only reindex active products
transform_doc_callback=transform_document,
chunk_size=1000
)
print(f"Reindexed {success_count} documents")
if failed_ops:
print(f"Failed operations: {len(failed_ops)}")from elasticsearch.helpers import parallel_bulk
import threading
def generate_docs_from_database():
"""Generate documents from database query."""
# Simulate database connection and query
for i in range(50000):
yield {
"_index": "analytics",
"_source": {
"user_id": i % 1000,
"event": f"action_{i % 10}",
"timestamp": "2024-01-01T00:00:00Z",
"value": i * 1.5
}
}
# Use parallel bulk for high-throughput indexing
results = []
for success, info in parallel_bulk(
client,
generate_docs_from_database(),
chunk_size=2000,
thread_count=8, # Use 8 parallel threads
queue_size=8,
timeout='120s'
):
results.append((success, info))
if len(results) % 100 == 0:
print(f"Processed {len(results)} chunks")
successful_chunks = sum(1 for success, _ in results if success)
print(f"Successfully processed {successful_chunks} chunks")import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk, async_scan
async def async_bulk_example():
"""Example of async bulk operations."""
async_client = AsyncElasticsearch(hosts=['http://localhost:9200'])
# Prepare async documents
async def async_generate_docs():
for i in range(1000):
yield {
"_index": "async_test",
"_id": i,
"_source": {"value": i, "squared": i * i}
}
try:
# Async bulk indexing
success_count, failed_ops = await async_bulk(
async_client,
async_generate_docs(),
chunk_size=100
)
print(f"Async indexed: {success_count} documents")
# Async scanning
total_scanned = 0
async for doc in async_scan(
async_client,
index="async_test",
query={"match_all": {}}
):
total_scanned += 1
print(f"Async scanned: {total_scanned} documents")
finally:
await async_client.close()
# Run async example
asyncio.run(async_bulk_example())Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch