Python client for Elasticsearch 5.x providing comprehensive access to all Elasticsearch APIs and features.
—
High-performance operations for processing multiple documents efficiently. Includes bulk indexing, updates, deletions, and specialized helper functions for streaming and parallel processing of large datasets.
Execute multiple document operations (index, create, update, delete) in a single request for improved performance.
def bulk(body: list, index: str = None, doc_type: str = None, **params) -> dict:
"""
Execute multiple document operations in a single request.
Parameters:
- body: List of operations (action/metadata and optional source)
- index: Default index for operations without explicit index
- doc_type: Default document type
- pipeline: Default ingest pipeline
- refresh: Control when changes are visible ('true', 'false', 'wait_for')
- routing: Default routing value
- timeout: Request timeout
- wait_for_active_shards: Wait for N shards to be active
Body format (list of alternating action lines and document lines):
[
{"index": {"_index": "my_index", "_type": "_doc", "_id": "1"}},
{"title": "Document 1", "content": "Content here"},
{"create": {"_index": "my_index", "_type": "_doc", "_id": "2"}},
{"title": "Document 2", "content": "More content"},
{"update": {"_index": "my_index", "_type": "_doc", "_id": "3"}},
{"doc": {"title": "Updated Document 3"}},
{"delete": {"_index": "my_index", "_type": "_doc", "_id": "4"}}
]
Returns:
dict: Bulk response with 'items' array containing results for each operation
"""Update multiple documents matching a query using scripts or partial updates.
def update_by_query(index: str, doc_type: str = None, body: dict = None, **params) -> dict:
"""
Update documents matching a query.
Parameters:
- index: Index name(s) to update
- doc_type: Document type(s)
- body: Update specification with query and script
- _source: Fields to include in response
- _source_excludes: Fields to exclude
- _source_includes: Fields to include
- allow_no_indices: Handle missing indices
- analyzer: Query analyzer
- analyze_wildcard: Analyze wildcards
- conflicts: How to handle version conflicts ('abort', 'proceed')
- default_operator: Default query operator
- df: Default field
- expand_wildcards: Wildcard expansion
- from_: Starting document
- ignore_unavailable: Ignore unavailable indices
- lenient: Ignore query failures
- pipeline: Ingest pipeline for updated documents
- preference: Node preference
- q: Query string
- refresh: Refresh after operation
- request_cache: Use request cache
- requests_per_second: Throttling rate (operations per second)
- routing: Routing values
- scroll: Scroll timeout for large updates
- scroll_size: Scroll batch size
- search_type: Search type
- search_timeout: Search timeout
- size: Maximum documents to update
- slices: Number of slices for parallel processing
- sort: Sort specification
- terminate_after: Terminate after N documents
- timeout: Request timeout
- version: Include versions in response
- version_type: Version type
- wait_for_active_shards: Wait for shards
- wait_for_completion: Wait for completion or return task
Body structure:
{
"query": {
"term": {"status": "draft"}
},
"script": {
"source": "ctx._source.status = 'published'; ctx._source.published_at = params.now",
"params": {"now": "2023-01-01T12:00:00Z"}
}
}
Returns:
dict: Update results with 'updated', 'version_conflicts', 'took', etc.
"""Delete multiple documents matching a query efficiently.
def delete_by_query(index: str, body: dict, doc_type: str = None, **params) -> dict:
"""
Delete documents matching a query.
Parameters:
- index: Index name(s) to delete from
- body: Delete specification with query
- doc_type: Document type(s)
- _source: Fields to include in response
- _source_excludes: Fields to exclude
- _source_includes: Fields to include
- allow_no_indices: Handle missing indices
- analyzer: Query analyzer
- analyze_wildcard: Analyze wildcards
- conflicts: How to handle conflicts ('abort', 'proceed')
- default_operator: Default query operator
- df: Default field
- expand_wildcards: Wildcard expansion
- from_: Starting document
- ignore_unavailable: Ignore unavailable indices
- lenient: Ignore query failures
- preference: Node preference
- q: Query string
- refresh: Refresh after operation
- request_cache: Use request cache
- requests_per_second: Throttling rate
- routing: Routing values
- scroll: Scroll timeout
- scroll_size: Scroll batch size
- search_type: Search type
- search_timeout: Search timeout
- size: Maximum documents to delete
- slices: Number of slices for parallel processing
- sort: Sort specification
- terminate_after: Terminate after N documents
- timeout: Request timeout
- version: Include versions
- wait_for_active_shards: Wait for shards
- wait_for_completion: Wait for completion
Body structure:
{
"query": {
"range": {
"created_at": {
"lt": "2022-01-01"
}
}
}
}
Returns:
dict: Deletion results with 'deleted', 'took', 'version_conflicts', etc.
"""Copy and transform documents between indices with optional query filtering and script processing.
def reindex(body: dict, **params) -> dict:
"""
Copy documents from source to destination index with optional transformation.
Parameters:
- body: Reindex specification with source and destination
- refresh: Refresh destination index after operation
- requests_per_second: Throttling rate
- slices: Number of slices for parallel processing
- timeout: Request timeout
- wait_for_active_shards: Wait for shards
- wait_for_completion: Wait for completion or return task
Body structure:
{
"source": {
"index": "source_index",
"type": "_doc", # Optional
"query": { # Optional filtering
"term": {"status": "published"}
},
"_source": ["title", "content"], # Optional field filtering
"size": 1000 # Batch size
},
"dest": {
"index": "destination_index",
"type": "_doc", # Optional
"pipeline": "my_pipeline" # Optional ingest pipeline
},
"script": { # Optional transformation
"source": "ctx._source.new_field = ctx._source.old_field + '_transformed'"
},
"conflicts": "proceed" # Handle version conflicts
}
Returns:
dict: Reindex results with 'created', 'updated', 'took', etc.
"""
def reindex_rethrottle(task_id: str = None, **params) -> dict:
"""
Change throttling of a running reindex task.
Parameters:
- task_id: Task identifier from reindex operation
- requests_per_second: New throttling rate
Returns:
dict: Updated task information
"""High-level helper utilities for common bulk operations with automatic batching, error handling, and progress tracking.
from elasticsearch5.helpers import bulk, streaming_bulk, parallel_bulk, scan, reindex
def bulk(client, actions, stats_only: bool = False, **kwargs) -> tuple:
"""
Helper for bulk operations with automatic batching and error handling.
Parameters:
- client: Elasticsearch client instance
- actions: Iterable of document actions
- stats_only: Return only success count (bool)
- index: Default index name
- doc_type: Default document type
- chunk_size: Documents per batch (default 500)
- max_chunk_bytes: Maximum batch size in bytes
- thread_count: Number of threads for parallel processing
- queue_size: Queue size for threading
- expand_action_callback: Function to transform actions
- refresh: Refresh after operation
- request_timeout: Request timeout
- max_retries: Maximum retry attempts
- initial_backoff: Initial retry delay
- max_backoff: Maximum retry delay
Action format:
{
"_op_type": "index", # or "create", "update", "delete"
"_index": "my_index",
"_type": "_doc",
"_id": "document_id",
"_source": {"field": "value"} # For index/create
}
Returns:
If stats_only=False: (success_count, list_of_errors)
If stats_only=True: success_count
Raises:
BulkIndexError: If errors occurred and not ignored
"""
def streaming_bulk(client, actions, chunk_size: int = 500, **kwargs):
"""
Generator for streaming bulk operations.
Parameters: Same as bulk()
Yields:
(is_success: bool, action_result: dict) for each action
"""
def parallel_bulk(client, actions, thread_count: int = 4, **kwargs):
"""
Generator for parallel bulk operations using multiple threads.
Parameters: Same as bulk() plus thread_count
Yields:
(is_success: bool, action_result: dict) for each action
"""
def scan(client, query: dict = None, scroll: str = '5m', **kwargs):
"""
Generator to efficiently scroll through all matching documents.
Parameters:
- client: Elasticsearch client
- query: Search query (default match_all)
- scroll: Scroll timeout
- index: Index name(s)
- doc_type: Document type(s)
- raise_on_error: Raise on scroll errors
- preserve_order: Maintain result ordering
- size: Documents per scroll batch
- request_timeout: Request timeout
- clear_scroll: Clear scroll on completion
Yields:
Individual documents from search results
"""
def reindex(client, source_index: str, target_index: str, query: dict = None, **kwargs) -> tuple:
"""
Helper to reindex documents between indices.
Parameters:
- client: Elasticsearch client
- source_index: Source index name
- target_index: Target index name
- query: Optional query to filter documents
- target_client: Different client for target (for cross-cluster)
- chunk_size: Documents per batch
- scroll: Scroll timeout
- scan_kwargs: Additional scan() parameters
- bulk_kwargs: Additional bulk() parameters
Returns:
(success_count, list_of_errors)
"""from elasticsearch5.helpers import BulkIndexError, ScanError
class BulkIndexError(Exception):
"""
Exception for bulk operation failures.
Attributes:
- errors: List of failed actions with error details
"""
class ScanError(Exception):
"""
Exception for scan operation failures.
Attributes:
- scroll_id: Scroll ID for potential resume
"""from elasticsearch5 import Elasticsearch
es = Elasticsearch(['localhost:9200'])
# Prepare bulk operations
actions = [
{
"_op_type": "index",
"_index": "articles",
"_type": "_doc",
"_id": "1",
"_source": {"title": "Article 1", "content": "Content 1"}
},
{
"_op_type": "create",
"_index": "articles",
"_type": "_doc",
"_id": "2",
"_source": {"title": "Article 2", "content": "Content 2"}
},
{
"_op_type": "update",
"_index": "articles",
"_type": "_doc",
"_id": "3",
"_source": {"doc": {"title": "Updated Article 3"}}
},
{
"_op_type": "delete",
"_index": "articles",
"_type": "_doc",
"_id": "4"
}
]
# Execute bulk operations
response = es.bulk(body=actions)
# Check for errors
if response['errors']:
for item in response['items']:
for operation, result in item.items():
if 'error' in result:
print(f"Error in {operation}: {result['error']}")
else:
print(f"Successfully processed {len(response['items'])} operations")from elasticsearch5.helpers import bulk, scan, reindex
# Generate documents
def generate_docs():
for i in range(10000):
yield {
"_index": "large_index",
"_type": "_doc",
"_id": str(i),
"_source": {
"title": f"Document {i}",
"content": f"This is content for document {i}",
"category": "bulk_test"
}
}
# Bulk index with helper
try:
success, failed = bulk(es, generate_docs(), chunk_size=1000)
print(f"Successfully indexed {success} documents")
if failed:
print(f"Failed to index {len(failed)} documents")
except BulkIndexError as e:
print(f"Bulk indexing failed: {e}")
for error in e.errors:
print(f"Error: {error}")from elasticsearch5.helpers import streaming_bulk
# Process large dataset with streaming
def process_large_dataset():
for is_success, result in streaming_bulk(es, generate_docs(), chunk_size=500):
if not is_success:
print(f"Failed to process: {result}")
else:
# Process successful result
pass
process_large_dataset()# Update all draft articles to published
update_body = {
"query": {
"term": {"status": "draft"}
},
"script": {
"source": """
ctx._source.status = 'published';
ctx._source.published_at = params.now;
ctx._source.publish_count = (ctx._source.publish_count ?: 0) + 1
""",
"params": {
"now": "2023-01-01T12:00:00Z"
}
}
}
response = es.update_by_query(
index='articles',
body=update_body,
conflicts='proceed', # Continue on version conflicts
refresh=True
)
print(f"Updated {response['updated']} documents")
print(f"Version conflicts: {response.get('version_conflicts', 0)}")# Reindex with field transformation and filtering
reindex_body = {
"source": {
"index": "old_articles",
"query": {
"range": {
"created_at": {
"gte": "2022-01-01"
}
}
}
},
"dest": {
"index": "new_articles",
"pipeline": "article_enrichment"
},
"script": {
"source": """
// Transform old format to new format
ctx._source.slug = ctx._source.title.toLowerCase().replaceAll('[^a-z0-9]+', '-');
ctx._source.word_count = ctx._source.content.split(' ').length;
ctx._source.migrated_at = '2023-01-01T00:00:00Z';
"""
}
}
response = es.reindex(
body=reindex_body,
wait_for_completion=False, # Run as task
slices='auto' # Parallel processing
)
if 'task' in response:
task_id = response['task']
print(f"Reindex started as task: {task_id}")
# Check task status later
task_status = es.tasks.get(task_id=task_id)
print(f"Task status: {task_status}")from elasticsearch5.helpers import scan
# Process all documents in an index
query = {
"query": {
"range": {
"created_at": {
"gte": "2023-01-01"
}
}
}
}
# Scan through all matching documents
processed_count = 0
for doc in scan(es, query=query, index='large_index', size=1000):
# Process each document
process_document(doc['_source'])
processed_count += 1
if processed_count % 10000 == 0:
print(f"Processed {processed_count} documents")
print(f"Total processed: {processed_count} documents")Install with Tessl CLI
npx tessl i tessl/pypi-elasticsearch5