Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters
—
Bulk operations in PyES provide high-performance batch processing for indexing, updating, and deleting large numbers of documents. Rather than sending individual requests to ElasticSearch, bulk operations combine multiple operations into single requests, dramatically improving throughput and reducing network overhead. This is essential for applications that need to process large volumes of data efficiently.
class ES:
"""
Main ES class bulk operation methods.
"""
def index_raw_bulk(self, header, document):
"""
Add raw bulk operation to buffer.
Args:
header (dict): Bulk operation header (index, update, delete)
document (dict, optional): Document body (not needed for delete)
"""
pass
def flush_bulk(self, forced=False):
"""
Flush buffered bulk operations to ElasticSearch.
Args:
forced (bool): Force flush even if bulk_size not reached. Default: False
Returns:
Bulk operation results
Raises:
BulkOperationException: If bulk operation fails
"""
pass
def force_bulk(self):
"""
Force immediate flush of all buffered bulk operations.
Returns:
Bulk operation results
"""
pass
def create_bulker(self):
"""
Create new bulker instance for managing bulk operations.
Returns:
Bulker: New bulker instance
"""
pass
@property
def bulk_size(self):
"""
Get current bulk size setting.
Returns:
int: Current bulk size
"""
pass
@bulk_size.setter
def bulk_size(self, size):
"""
Set bulk size for automatic flushing.
Args:
size (int): Number of operations per bulk request
"""
pass
# Basic bulk operations setup
from pyes import ES
es = ES('localhost:9200')
# Configure bulk processing
es.bulk_size = 1000 # Process 1000 operations per bulk request
# Bulk indexing with automatic flushing
for i in range(5000):
doc = {"title": f"Document {i}", "content": f"Content for document {i}"}
es.index(doc, "test_index", "doc", id=str(i), bulk=True)
# Automatically flushes when bulk_size (1000) is reached
# Manual flush for remaining documents
es.flush_bulk(forced=True)def bulk_index_documents(es, documents, index, doc_type):
"""
Efficiently index large numbers of documents using bulk operations.
Args:
es (ES): ElasticSearch client instance
documents (list): List of documents to index
index (str): Target index name
doc_type (str): Document type
Returns:
dict: Bulk operation results and statistics
"""
# Configure bulk settings for optimal performance
es.bulk_size = 1000
stats = {
"total_docs": len(documents),
"processed": 0,
"errors": [],
"start_time": time.time()
}
try:
for i, doc in enumerate(documents):
# Add document to bulk buffer
es.index(doc, index, doc_type, id=doc.get('id', str(i)), bulk=True)
stats["processed"] += 1
# Progress reporting
if stats["processed"] % 10000 == 0:
print(f"Processed {stats['processed']}/{stats['total_docs']} documents")
# Flush any remaining documents
es.flush_bulk(forced=True)
stats["end_time"] = time.time()
stats["duration"] = stats["end_time"] - stats["start_time"]
stats["docs_per_second"] = stats["processed"] / stats["duration"]
return stats
except Exception as e:
stats["errors"].append(str(e))
return stats
# Large dataset indexing example
import time
# Generate sample documents
documents = []
for i in range(50000):
doc = {
"id": i,
"title": f"Document {i}",
"content": f"This is the content of document number {i}",
"category": f"category_{i % 10}",
"timestamp": int(time.time()) + i,
"views": i * 10,
"rating": (i % 5) + 1
}
documents.append(doc)
# Bulk index all documents
results = bulk_index_documents(es, documents, "bulk_test", "document")
print(f"Indexed {results['processed']} documents in {results['duration']:.2f} seconds")
print(f"Throughput: {results['docs_per_second']:.2f} docs/second")def bulk_update_documents(es, updates, index, doc_type):
"""
Perform bulk updates on documents.
Args:
es (ES): ElasticSearch client instance
updates (list): List of update operations
index (str): Target index name
doc_type (str): Document type
Returns:
dict: Update results and statistics
"""
stats = {"updated": 0, "errors": []}
try:
for update_op in updates:
doc_id = update_op["id"]
if "script" in update_op:
# Script-based update
es.update(index, doc_type, doc_id,
script=update_op["script"],
params=update_op.get("params", {}),
bulk=True)
else:
# Document-based update
es.partial_update(index, doc_type, doc_id,
doc=update_op["doc"],
bulk=True)
stats["updated"] += 1
# Flush bulk updates
es.flush_bulk(forced=True)
except Exception as e:
stats["errors"].append(str(e))
return stats
# Bulk update examples
update_operations = [
{
"id": "1",
"script": "ctx._source.views += params.increment",
"params": {"increment": 10}
},
{
"id": "2",
"doc": {"category": "updated_category", "last_modified": "2023-12-01"}
},
{
"id": "3",
"script": "ctx._source.rating = Math.max(ctx._source.rating, params.min_rating)",
"params": {"min_rating": 3}
}
]
update_results = bulk_update_documents(es, update_operations, "bulk_test", "document")
print(f"Updated {update_results['updated']} documents")def bulk_delete_documents(es, doc_ids, index, doc_type):
"""
Perform bulk deletion of documents.
Args:
es (ES): ElasticSearch client instance
doc_ids (list): List of document IDs to delete
index (str): Target index name
doc_type (str): Document type
Returns:
dict: Deletion results
"""
stats = {"deleted": 0, "not_found": 0, "errors": []}
try:
for doc_id in doc_ids:
es.delete(index, doc_type, doc_id, bulk=True)
stats["deleted"] += 1
# Flush bulk deletions
results = es.flush_bulk(forced=True)
# Process results for detailed statistics
if results and "items" in results:
for item in results["items"]:
if "delete" in item:
delete_result = item["delete"]
if delete_result.get("status") == 404:
stats["not_found"] += 1
stats["deleted"] -= 1
except Exception as e:
stats["errors"].append(str(e))
return stats
# Bulk deletion example
doc_ids_to_delete = [str(i) for i in range(1000, 2000)] # Delete docs 1000-1999
deletion_results = bulk_delete_documents(es, doc_ids_to_delete, "bulk_test", "document")
print(f"Deleted: {deletion_results['deleted']}")
print(f"Not found: {deletion_results['not_found']}")
print(f"Errors: {len(deletion_results['errors'])}")def mixed_bulk_operations(es, operations):
"""
Execute mixed bulk operations (index, update, delete) in a single batch.
Args:
es (ES): ElasticSearch client instance
operations (list): List of mixed operation dictionaries
Returns:
dict: Operation results
"""
stats = {
"index_ops": 0,
"update_ops": 0,
"delete_ops": 0,
"errors": []
}
try:
for op in operations:
op_type = op["operation"]
if op_type == "index":
es.index(op["doc"], op["index"], op["type"],
id=op.get("id"), bulk=True)
stats["index_ops"] += 1
elif op_type == "update":
if "script" in op:
es.update(op["index"], op["type"], op["id"],
script=op["script"],
params=op.get("params", {}),
bulk=True)
else:
es.partial_update(op["index"], op["type"], op["id"],
doc=op["doc"], bulk=True)
stats["update_ops"] += 1
elif op_type == "delete":
es.delete(op["index"], op["type"], op["id"], bulk=True)
stats["delete_ops"] += 1
# Execute all operations
results = es.flush_bulk(forced=True)
# Process results for error handling
if results and "errors" in results and results["errors"]:
for item in results.get("items", []):
for action, result in item.items():
if "error" in result:
stats["errors"].append({
"action": action,
"id": result.get("_id"),
"error": result["error"]
})
except Exception as e:
stats["errors"].append({"general_error": str(e)})
return stats
# Mixed operations example
mixed_ops = [
# Index new documents
{
"operation": "index",
"index": "mixed_test",
"type": "doc",
"id": "new_1",
"doc": {"title": "New Document 1", "status": "active"}
},
{
"operation": "index",
"index": "mixed_test",
"type": "doc",
"id": "new_2",
"doc": {"title": "New Document 2", "status": "active"}
},
# Update existing documents
{
"operation": "update",
"index": "mixed_test",
"type": "doc",
"id": "existing_1",
"doc": {"last_updated": "2023-12-01", "status": "updated"}
},
{
"operation": "update",
"index": "mixed_test",
"type": "doc",
"id": "existing_2",
"script": "ctx._source.view_count += 1"
},
# Delete documents
{
"operation": "delete",
"index": "mixed_test",
"type": "doc",
"id": "old_1"
},
{
"operation": "delete",
"index": "mixed_test",
"type": "doc",
"id": "old_2"
}
]
mixed_results = mixed_bulk_operations(es, mixed_ops)
print(f"Index operations: {mixed_results['index_ops']}")
print(f"Update operations: {mixed_results['update_ops']}")
print(f"Delete operations: {mixed_results['delete_ops']}")
print(f"Errors: {len(mixed_results['errors'])}")def bulk_upsert_documents(es, upsert_ops, index, doc_type):
"""
Perform bulk upsert operations (update if exists, create if not).
Args:
es (ES): ElasticSearch client instance
upsert_ops (list): List of upsert operations
index (str): Target index
doc_type (str): Document type
Returns:
dict: Upsert results
"""
stats = {"upserted": 0, "errors": []}
try:
for upsert_op in upsert_ops:
doc_id = upsert_op["id"]
# Use update with upsert
es.update(
index=index,
doc_type=doc_type,
id=doc_id,
document=upsert_op.get("doc", {}),
upsert=upsert_op.get("upsert", upsert_op.get("doc", {})),
script=upsert_op.get("script"),
params=upsert_op.get("params", {}),
bulk=True
)
stats["upserted"] += 1
# Flush upsert operations
es.flush_bulk(forced=True)
except Exception as e:
stats["errors"].append(str(e))
return stats
# Upsert examples
upsert_operations = [
{
"id": "user_123",
"doc": {"name": "John Doe", "last_seen": "2023-12-01"},
"upsert": {"name": "John Doe", "created": "2023-12-01", "last_seen": "2023-12-01"}
},
{
"id": "user_456",
"script": "if (ctx._source.containsKey('visit_count')) { ctx._source.visit_count += 1 } else { ctx._source.visit_count = 1 }",
"upsert": {"visit_count": 1, "created": "2023-12-01"}
}
]
upsert_results = bulk_upsert_documents(es, upsert_operations, "users", "profile")class StreamingBulkProcessor:
"""
Streaming bulk processor for continuous data ingestion.
Provides automatic batching, error handling, and performance monitoring
for high-volume data streams.
"""
def __init__(self, es_client, bulk_size=1000, flush_interval=30,
max_retries=3, retry_delay=5):
"""
Initialize StreamingBulkProcessor.
Args:
es_client (ES): ElasticSearch client
bulk_size (int): Documents per batch. Default: 1000
flush_interval (int): Auto-flush interval in seconds. Default: 30
max_retries (int): Maximum retry attempts. Default: 3
retry_delay (int): Delay between retries in seconds. Default: 5
"""
self.es = es_client
self.bulk_size = bulk_size
self.flush_interval = flush_interval
self.max_retries = max_retries
self.retry_delay = retry_delay
self.buffer = []
self.last_flush = time.time()
self.stats = {
"processed": 0,
"errors": 0,
"retries": 0,
"flushes": 0
}
def add_document(self, doc, index, doc_type, doc_id=None, operation="index"):
"""
Add document to processing buffer.
Args:
doc (dict): Document to process
index (str): Target index
doc_type (str): Document type
doc_id (str, optional): Document ID
operation (str): Operation type (index, update, delete). Default: "index"
"""
self.buffer.append({
"operation": operation,
"index": index,
"type": doc_type,
"id": doc_id or str(uuid.uuid4()),
"doc": doc
})
# Auto-flush if buffer is full or time interval exceeded
if (len(self.buffer) >= self.bulk_size or
time.time() - self.last_flush > self.flush_interval):
self.flush()
def flush(self):
"""Flush buffered operations to ElasticSearch."""
if not self.buffer:
return
retry_count = 0
success = False
while not success and retry_count <= self.max_retries:
try:
# Process buffer operations
for op in self.buffer:
if op["operation"] == "index":
self.es.index(op["doc"], op["index"], op["type"],
id=op["id"], bulk=True)
elif op["operation"] == "update":
self.es.partial_update(op["index"], op["type"],
op["id"], doc=op["doc"], bulk=True)
elif op["operation"] == "delete":
self.es.delete(op["index"], op["type"], op["id"], bulk=True)
# Execute bulk operations
self.es.flush_bulk(forced=True)
# Update statistics
self.stats["processed"] += len(self.buffer)
self.stats["flushes"] += 1
# Clear buffer and update timestamp
self.buffer.clear()
self.last_flush = time.time()
success = True
except Exception as e:
retry_count += 1
self.stats["retries"] += 1
if retry_count <= self.max_retries:
print(f"Bulk operation failed, retrying ({retry_count}/{self.max_retries}): {e}")
time.sleep(self.retry_delay)
else:
self.stats["errors"] += len(self.buffer)
print(f"Bulk operation failed after {self.max_retries} retries: {e}")
# Could save failed operations to dead letter queue here
self.buffer.clear()
def get_stats(self):
"""Get processor statistics."""
return {
**self.stats,
"buffer_size": len(self.buffer),
"last_flush": self.last_flush,
"uptime": time.time() - getattr(self, 'start_time', time.time())
}
def close(self):
"""Flush remaining operations and close processor."""
self.flush()
# Usage example
import uuid
import time
import json
processor = StreamingBulkProcessor(es, bulk_size=2000, flush_interval=60)
# Simulate continuous data stream
def simulate_data_stream():
"""Simulate continuous data ingestion."""
for i in range(100000):
doc = {
"timestamp": time.time(),
"event_type": f"event_{i % 10}",
"user_id": f"user_{i % 1000}",
"data": {"value": i, "category": f"cat_{i % 5}"}
}
processor.add_document(doc, "events", "event")
# Simulate processing delay
if i % 1000 == 0:
stats = processor.get_stats()
print(f"Processed: {stats['processed']}, Buffer: {stats['buffer_size']}")
time.sleep(0.1)
# Close processor
processor.close()
final_stats = processor.get_stats()
print(f"Final stats: {final_stats}")
# Run simulation
simulate_data_stream()import threading
import queue
import concurrent.futures
class ParallelBulkProcessor:
"""
Parallel bulk processor for maximum throughput.
Uses multiple threads to process bulk operations concurrently.
"""
def __init__(self, es_client, num_workers=4, bulk_size=1000, queue_size=10000):
"""
Initialize ParallelBulkProcessor.
Args:
es_client (ES): ElasticSearch client
num_workers (int): Number of worker threads. Default: 4
bulk_size (int): Documents per bulk request. Default: 1000
queue_size (int): Maximum queue size. Default: 10000
"""
self.es = es_client
self.num_workers = num_workers
self.bulk_size = bulk_size
# Thread-safe queue for operations
self.operation_queue = queue.Queue(maxsize=queue_size)
self.result_queue = queue.Queue()
# Worker management
self.workers = []
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers)
self.running = False
# Statistics
self.stats = {
"queued": 0,
"processed": 0,
"errors": 0,
"active_workers": 0
}
def start(self):
"""Start the parallel processor."""
self.running = True
# Start worker threads
for i in range(self.num_workers):
future = self.executor.submit(self._worker_loop, i)
self.workers.append(future)
def _worker_loop(self, worker_id):
"""Worker thread main loop."""
batch = []
while self.running or not self.operation_queue.empty():
try:
# Get operation from queue (with timeout)
operation = self.operation_queue.get(timeout=1.0)
batch.append(operation)
# Process batch when full
if len(batch) >= self.bulk_size:
self._process_batch(batch, worker_id)
batch = []
self.operation_queue.task_done()
except queue.Empty:
# Process remaining batch if any
if batch and not self.running:
self._process_batch(batch, worker_id)
batch = []
continue
except Exception as e:
self.stats["errors"] += 1
print(f"Worker {worker_id} error: {e}")
# Process final batch
if batch:
self._process_batch(batch, worker_id)
def _process_batch(self, batch, worker_id):
"""Process a batch of operations."""
try:
self.stats["active_workers"] += 1
# Create separate ES client for this worker to avoid conflicts
worker_es = ES(self.es.server)
worker_es.bulk_size = len(batch)
for op in batch:
if op["operation"] == "index":
worker_es.index(op["doc"], op["index"], op["type"],
id=op["id"], bulk=True)
elif op["operation"] == "update":
worker_es.partial_update(op["index"], op["type"],
op["id"], doc=op["doc"], bulk=True)
elif op["operation"] == "delete":
worker_es.delete(op["index"], op["type"], op["id"], bulk=True)
# Execute bulk operations
results = worker_es.flush_bulk(forced=True)
# Update statistics
self.stats["processed"] += len(batch)
# Queue results for monitoring
self.result_queue.put({
"worker_id": worker_id,
"batch_size": len(batch),
"success": True,
"timestamp": time.time()
})
except Exception as e:
self.stats["errors"] += len(batch)
self.result_queue.put({
"worker_id": worker_id,
"batch_size": len(batch),
"success": False,
"error": str(e),
"timestamp": time.time()
})
finally:
self.stats["active_workers"] -= 1
def add_operation(self, operation, index, doc_type, doc_id=None, doc=None):
"""
Add operation to processing queue.
Args:
operation (str): Operation type (index, update, delete)
index (str): Target index
doc_type (str): Document type
doc_id (str, optional): Document ID
doc (dict, optional): Document data
"""
op = {
"operation": operation,
"index": index,
"type": doc_type,
"id": doc_id or str(uuid.uuid4()),
"doc": doc
}
self.operation_queue.put(op)
self.stats["queued"] += 1
def stop(self, timeout=60):
"""
Stop the processor and wait for completion.
Args:
timeout (int): Maximum wait time in seconds. Default: 60
"""
# Signal workers to stop
self.running = False
# Wait for queue to empty
self.operation_queue.join()
# Shutdown executor
self.executor.shutdown(wait=True, timeout=timeout)
def get_stats(self):
"""Get current processing statistics."""
return {
**self.stats,
"queue_size": self.operation_queue.qsize(),
"result_queue_size": self.result_queue.qsize()
}
# Usage example
def parallel_bulk_example():
"""Example of parallel bulk processing."""
# Create parallel processor
processor = ParallelBulkProcessor(es, num_workers=8, bulk_size=1500)
# Start processing
processor.start()
try:
# Add large number of operations
for i in range(100000):
doc = {
"id": i,
"title": f"Parallel Document {i}",
"content": f"Content for parallel processing test {i}",
"batch": i // 1000,
"timestamp": time.time()
}
processor.add_operation("index", "parallel_test", "doc", str(i), doc)
# Monitor progress
if i % 10000 == 0:
stats = processor.get_stats()
print(f"Queued: {stats['queued']}, Processed: {stats['processed']}, "
f"Active Workers: {stats['active_workers']}")
finally:
# Stop processor and wait for completion
processor.stop(timeout=120)
final_stats = processor.get_stats()
print(f"Final stats: {final_stats}")
# Run parallel processing example
parallel_bulk_example()from pyes import BulkOperationException
import logging
class RobustBulkProcessor:
"""
Bulk processor with comprehensive error handling and recovery.
"""
def __init__(self, es_client, bulk_size=1000):
self.es = es_client
self.bulk_size = bulk_size
self.logger = logging.getLogger("bulk_processor")
# Error tracking
self.error_stats = {
"version_conflicts": 0,
"document_missing": 0,
"index_missing": 0,
"mapping_errors": 0,
"timeout_errors": 0,
"other_errors": 0
}
# Failed operations for retry
self.failed_operations = []
self.dead_letter_queue = []
def process_with_error_handling(self, operations):
"""
Process operations with comprehensive error handling.
Args:
operations (list): List of bulk operations
Returns:
dict: Processing results with error details
"""
results = {
"successful": 0,
"failed": 0,
"errors": [],
"retry_needed": []
}
try:
# Execute bulk operations
for op in operations:
self._add_operation_to_bulk(op)
bulk_results = self.es.flush_bulk(forced=True)
# Process results for error handling
if bulk_results and "items" in bulk_results:
self._process_bulk_results(bulk_results["items"], results)
except BulkOperationException as e:
self.logger.error(f"Bulk operation exception: {e}")
results["errors"].append({"type": "bulk_exception", "message": str(e)})
# Handle specific bulk errors
if hasattr(e, 'errors') and e.errors:
for error_item in e.errors:
self._categorize_error(error_item, results)
except Exception as e:
self.logger.error(f"Unexpected error during bulk processing: {e}")
results["errors"].append({"type": "unexpected", "message": str(e)})
return results
def _add_operation_to_bulk(self, op):
"""Add operation to bulk buffer."""
if op["operation"] == "index":
self.es.index(op["doc"], op["index"], op["type"],
id=op["id"], bulk=True)
elif op["operation"] == "update":
self.es.partial_update(op["index"], op["type"], op["id"],
doc=op["doc"], bulk=True)
elif op["operation"] == "delete":
self.es.delete(op["index"], op["type"], op["id"], bulk=True)
def _process_bulk_results(self, items, results):
"""Process individual item results from bulk response."""
for item in items:
for action, result in item.items():
if "error" in result:
# Operation failed
results["failed"] += 1
error_info = {
"action": action,
"id": result.get("_id"),
"status": result.get("status"),
"error": result["error"]
}
# Categorize error for appropriate handling
if self._is_retryable_error(result):
results["retry_needed"].append(error_info)
else:
results["errors"].append(error_info)
self.dead_letter_queue.append(error_info)
self._update_error_stats(result["error"])
else:
# Operation successful
results["successful"] += 1
def _is_retryable_error(self, result):
"""Determine if error is retryable."""
error = result.get("error", {})
error_type = error.get("type", "")
status = result.get("status", 0)
# Retryable conditions
if status in [429, 503, 504]: # Rate limited or service unavailable
return True
if error_type in ["timeout_exception", "connect_timeout_exception"]:
return True
if "circuit_breaking_exception" in error_type:
return True
return False
def _categorize_error(self, error_item, results):
"""Categorize error for statistics and handling."""
error = error_item.get("error", {})
error_type = error.get("type", "")
if "version_conflict" in error_type:
self.error_stats["version_conflicts"] += 1
elif "document_missing" in error_type:
self.error_stats["document_missing"] += 1
elif "index_not_found" in error_type:
self.error_stats["index_missing"] += 1
elif "mapper_parsing" in error_type:
self.error_stats["mapping_errors"] += 1
elif "timeout" in error_type:
self.error_stats["timeout_errors"] += 1
else:
self.error_stats["other_errors"] += 1
def _update_error_stats(self, error):
"""Update error statistics."""
error_type = error.get("type", "")
if "version_conflict" in error_type:
self.error_stats["version_conflicts"] += 1
elif "document_missing" in error_type:
self.error_stats["document_missing"] += 1
elif "index_not_found" in error_type:
self.error_stats["index_missing"] += 1
elif "mapper_parsing" in error_type:
self.error_stats["mapping_errors"] += 1
elif "timeout" in error_type:
self.error_stats["timeout_errors"] += 1
else:
self.error_stats["other_errors"] += 1
def retry_failed_operations(self, max_retries=3, retry_delay=5):
"""
Retry operations that failed with retryable errors.
Args:
max_retries (int): Maximum retry attempts. Default: 3
retry_delay (int): Delay between retries in seconds. Default: 5
Returns:
dict: Retry results
"""
retry_results = {"successful_retries": 0, "permanent_failures": 0}
for attempt in range(max_retries):
if not self.failed_operations:
break
self.logger.info(f"Retry attempt {attempt + 1} for {len(self.failed_operations)} operations")
# Retry failed operations
retry_batch = self.failed_operations.copy()
self.failed_operations.clear()
results = self.process_with_error_handling(retry_batch)
retry_results["successful_retries"] += results["successful"]
# Failed retries go back to failed_operations
self.failed_operations.extend(results["retry_needed"])
if not self.failed_operations:
break
time.sleep(retry_delay)
# Move permanently failed operations to dead letter queue
retry_results["permanent_failures"] = len(self.failed_operations)
self.dead_letter_queue.extend(self.failed_operations)
self.failed_operations.clear()
return retry_results
def get_error_summary(self):
"""Get comprehensive error summary."""
return {
"error_stats": self.error_stats,
"failed_operations": len(self.failed_operations),
"dead_letter_queue": len(self.dead_letter_queue),
"total_errors": sum(self.error_stats.values())
}
# Usage example with error handling
def robust_bulk_processing_example():
"""Example of robust bulk processing with error handling."""
processor = RobustBulkProcessor(es, bulk_size=1000)
# Create problematic operations to test error handling
operations = []
for i in range(5000):
# Mix of good and problematic operations
if i % 100 == 0:
# Version conflict (trying to update non-existent doc with version)
operations.append({
"operation": "update",
"index": "test_index",
"type": "doc",
"id": f"conflict_{i}",
"doc": {"field": "value"},
"version": 99 # Will cause version conflict
})
elif i % 150 == 0:
# Invalid document (missing required field)
operations.append({
"operation": "index",
"index": "strict_index", # Index with strict mapping
"type": "doc",
"id": str(i),
"doc": {"invalid_field": "value"} # Will cause mapping error
})
else:
# Valid operation
operations.append({
"operation": "index",
"index": "test_index",
"type": "doc",
"id": str(i),
"doc": {"title": f"Document {i}", "content": f"Content {i}"}
})
# Process with error handling
results = processor.process_with_error_handling(operations)
print(f"Successful operations: {results['successful']}")
print(f"Failed operations: {results['failed']}")
print(f"Operations needing retry: {len(results['retry_needed'])}")
# Retry failed operations
retry_results = processor.retry_failed_operations()
print(f"Successful retries: {retry_results['successful_retries']}")
print(f"Permanent failures: {retry_results['permanent_failures']}")
# Error summary
error_summary = processor.get_error_summary()
print(f"Error summary: {error_summary}")
# Run robust processing example
robust_bulk_processing_example()def optimize_bulk_performance():
"""Comprehensive bulk performance optimization strategies."""
# 1. Optimal bulk size calculation
def calculate_optimal_bulk_size(avg_doc_size_kb, available_memory_mb,
network_latency_ms):
"""Calculate optimal bulk size based on system characteristics."""
# Target: 5-15MB per bulk request
target_bulk_mb = 10
# Calculate based on document size
docs_per_mb = 1024 / avg_doc_size_kb
base_bulk_size = int(target_bulk_mb * docs_per_mb)
# Adjust for available memory (use max 10% for bulk buffer)
memory_limit = (available_memory_mb * 0.1 * 1024) / avg_doc_size_kb
memory_limited_size = int(min(base_bulk_size, memory_limit))
# Adjust for network latency
if network_latency_ms > 100:
# High latency: larger bulks to amortize network cost
latency_adjusted = min(memory_limited_size * 2, 10000)
elif network_latency_ms < 20:
# Low latency: smaller bulks for faster feedback
latency_adjusted = max(memory_limited_size // 2, 500)
else:
latency_adjusted = memory_limited_size
return max(100, min(latency_adjusted, 10000)) # Reasonable bounds
# 2. Connection optimization
def optimize_es_connection():
"""Optimize ElasticSearch connection for bulk operations."""
optimized_es = ES(
server=["es1.example.com:9200", "es2.example.com:9200"], # Multiple nodes
timeout=120.0, # Longer timeout for bulk operations
max_retries=3, # Retry failed requests
retry_time=30, # Wait between retries
bulk_size=5000, # Optimized bulk size
# Connection pooling (implementation-specific)
connection_pool_size=10,
connection_keep_alive=True
)
return optimized_es
# 3. Document preparation optimization
def prepare_documents_efficiently(raw_docs):
"""Efficiently prepare documents for bulk indexing."""
prepared_docs = []
# Batch process documents
for doc in raw_docs:
# Minimize document size
optimized_doc = {
# Only include necessary fields
k: v for k, v in doc.items()
if v is not None and v != "" and k != "_internal"
}
# Optimize field values
if "timestamp" in optimized_doc:
# Use epoch time instead of ISO string (smaller)
optimized_doc["timestamp"] = int(optimized_doc["timestamp"])
# Compress large text fields if beneficial
if "content" in optimized_doc and len(optimized_doc["content"]) > 1000:
# Could implement compression here
pass
prepared_docs.append(optimized_doc)
return prepared_docs
# 4. Memory management
def memory_efficient_bulk_processing(documents, es_client):
"""Process documents with memory efficiency."""
import gc
batch_size = 10000 # Process in memory-friendly batches
total_processed = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Process batch
for doc in batch:
es_client.index(doc, "optimized_index", "doc", bulk=True)
# Flush and cleanup
es_client.flush_bulk(forced=True)
total_processed += len(batch)
# Force garbage collection to free memory
if total_processed % (batch_size * 5) == 0:
gc.collect()
print(f"Processed {total_processed}/{len(documents)} documents")
return total_processed
# 5. Index optimization for bulk operations
def optimize_index_for_bulk_operations(es_client, index_name):
"""Optimize index settings for bulk operations."""
# Temporarily disable refresh for faster indexing
es_client.indices.update_settings(index_name, {
"refresh_interval": "-1", # Disable auto-refresh
"number_of_replicas": 0, # Reduce replicas during bulk load
"translog.flush_threshold_size": "1gb", # Larger translog
"merge.policy.max_merged_segment": "5gb" # Larger segments
})
return {
"refresh_interval": "1s", # Original settings to restore
"number_of_replicas": 1,
"translog.flush_threshold_size": "512mb",
"merge.policy.max_merged_segment": "5gb"
}
# 6. Post-bulk optimization
def post_bulk_optimization(es_client, index_name, original_settings):
"""Restore optimal settings after bulk operations."""
# Restore original settings
es_client.indices.update_settings(index_name, original_settings)
# Force refresh to make documents searchable
es_client.indices.refresh(index_name)
# Force merge to optimize segments
es_client.indices.optimize(index_name, max_num_segments=1)
return {
"calculate_bulk_size": calculate_optimal_bulk_size,
"optimize_connection": optimize_es_connection,
"prepare_documents": prepare_documents_efficiently,
"memory_efficient_processing": memory_efficient_bulk_processing,
"optimize_index": optimize_index_for_bulk_operations,
"post_optimization": post_bulk_optimization
}
# Example of comprehensive bulk optimization
def optimized_bulk_pipeline():
"""Complete optimized bulk processing pipeline."""
# Get optimization functions
optimizers = optimize_bulk_performance()
# Calculate optimal bulk size (example values)
optimal_size = optimizers["calculate_bulk_size"](
avg_doc_size_kb=2.5, # 2.5KB average document size
available_memory_mb=8192, # 8GB available memory
network_latency_ms=50 # 50ms network latency
)
print(f"Calculated optimal bulk size: {optimal_size}")
# Optimize ES connection
optimized_es = optimizers["optimize_connection"]()
optimized_es.bulk_size = optimal_size
# Prepare index for bulk operations
index_name = "optimized_bulk_index"
original_settings = optimizers["optimize_index"](optimized_es, index_name)
try:
# Generate and prepare documents
raw_documents = [
{"id": i, "title": f"Optimized Document {i}",
"content": f"Optimized content for document {i}" * 10,
"timestamp": time.time() + i}
for i in range(100000)
]
prepared_docs = optimizers["prepare_documents"](raw_documents)
# Process with memory efficiency
processed_count = optimizers["memory_efficient_processing"](
prepared_docs, optimized_es
)
print(f"Successfully processed {processed_count} documents")
finally:
# Restore optimal settings
optimizers["post_optimization"](optimized_es, index_name, original_settings)
# Run optimized pipeline
optimized_bulk_pipeline()PyES bulk operations provide powerful capabilities for high-performance data processing, with comprehensive support for error handling, parallel processing, and performance optimization to handle large-scale data ingestion efficiently.
Install with Tessl CLI
npx tessl i tessl/pypi-pyes