CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyes

Python Elastic Search driver providing a pythonic interface for interacting with ElasticSearch clusters

Pending
Overview
Eval results
Files

bulk-operations.mddocs/

PyES Bulk Operations

Overview

Bulk operations in PyES provide high-performance batch processing for indexing, updating, and deleting large numbers of documents. Rather than sending individual requests to ElasticSearch, bulk operations combine multiple operations into single requests, dramatically improving throughput and reducing network overhead. This is essential for applications that need to process large volumes of data efficiently.

Core Bulk Classes

ES Bulk Methods

class ES:
    """
    Main ES class bulk operation methods.
    """
    
    def index_raw_bulk(self, header, document):
        """
        Add raw bulk operation to buffer.
        
        Args:
            header (dict): Bulk operation header (index, update, delete)
            document (dict, optional): Document body (not needed for delete)
        """
        pass
    
    def flush_bulk(self, forced=False):
        """
        Flush buffered bulk operations to ElasticSearch.
        
        Args:
            forced (bool): Force flush even if bulk_size not reached. Default: False
            
        Returns:
            Bulk operation results
            
        Raises:
            BulkOperationException: If bulk operation fails
        """
        pass
    
    def force_bulk(self):
        """
        Force immediate flush of all buffered bulk operations.
        
        Returns:
            Bulk operation results
        """
        pass
    
    def create_bulker(self):
        """
        Create new bulker instance for managing bulk operations.
        
        Returns:
            Bulker: New bulker instance
        """
        pass
    
    @property
    def bulk_size(self):
        """
        Get current bulk size setting.
        
        Returns:
            int: Current bulk size
        """
        pass
    
    @bulk_size.setter
    def bulk_size(self, size):
        """
        Set bulk size for automatic flushing.
        
        Args:
            size (int): Number of operations per bulk request
        """
        pass

# Basic bulk operations setup
from pyes import ES

es = ES('localhost:9200')

# Configure bulk processing
es.bulk_size = 1000  # Process 1000 operations per bulk request

# Bulk indexing with automatic flushing
for i in range(5000):
    doc = {"title": f"Document {i}", "content": f"Content for document {i}"}
    es.index(doc, "test_index", "doc", id=str(i), bulk=True)
    # Automatically flushes when bulk_size (1000) is reached

# Manual flush for remaining documents
es.flush_bulk(forced=True)

Bulk Document Operations

Bulk Indexing

def bulk_index_documents(es, documents, index, doc_type):
    """
    Efficiently index large numbers of documents using bulk operations.
    
    Args:
        es (ES): ElasticSearch client instance
        documents (list): List of documents to index
        index (str): Target index name
        doc_type (str): Document type
        
    Returns:
        dict: Bulk operation results and statistics
    """
    
    # Configure bulk settings for optimal performance
    es.bulk_size = 1000
    
    stats = {
        "total_docs": len(documents),
        "processed": 0,
        "errors": [],
        "start_time": time.time()
    }
    
    try:
        for i, doc in enumerate(documents):
            # Add document to bulk buffer
            es.index(doc, index, doc_type, id=doc.get('id', str(i)), bulk=True)
            stats["processed"] += 1
            
            # Progress reporting
            if stats["processed"] % 10000 == 0:
                print(f"Processed {stats['processed']}/{stats['total_docs']} documents")
        
        # Flush any remaining documents
        es.flush_bulk(forced=True)
        
        stats["end_time"] = time.time()
        stats["duration"] = stats["end_time"] - stats["start_time"]
        stats["docs_per_second"] = stats["processed"] / stats["duration"]
        
        return stats
        
    except Exception as e:
        stats["errors"].append(str(e))
        return stats

# Large dataset indexing example
import time

# Generate sample documents
documents = []
for i in range(50000):
    doc = {
        "id": i,
        "title": f"Document {i}",
        "content": f"This is the content of document number {i}",
        "category": f"category_{i % 10}",
        "timestamp": int(time.time()) + i,
        "views": i * 10,
        "rating": (i % 5) + 1
    }
    documents.append(doc)

# Bulk index all documents
results = bulk_index_documents(es, documents, "bulk_test", "document")
print(f"Indexed {results['processed']} documents in {results['duration']:.2f} seconds")
print(f"Throughput: {results['docs_per_second']:.2f} docs/second")

Bulk Updates

def bulk_update_documents(es, updates, index, doc_type):
    """
    Perform bulk updates on documents.
    
    Args:
        es (ES): ElasticSearch client instance
        updates (list): List of update operations
        index (str): Target index name
        doc_type (str): Document type
        
    Returns:
        dict: Update results and statistics
    """
    
    stats = {"updated": 0, "errors": []}
    
    try:
        for update_op in updates:
            doc_id = update_op["id"]
            
            if "script" in update_op:
                # Script-based update
                es.update(index, doc_type, doc_id,
                         script=update_op["script"],
                         params=update_op.get("params", {}),
                         bulk=True)
            else:
                # Document-based update
                es.partial_update(index, doc_type, doc_id,
                                doc=update_op["doc"],
                                bulk=True)
            
            stats["updated"] += 1
        
        # Flush bulk updates
        es.flush_bulk(forced=True)
        
    except Exception as e:
        stats["errors"].append(str(e))
    
    return stats

# Bulk update examples
update_operations = [
    {
        "id": "1",
        "script": "ctx._source.views += params.increment",
        "params": {"increment": 10}
    },
    {
        "id": "2", 
        "doc": {"category": "updated_category", "last_modified": "2023-12-01"}
    },
    {
        "id": "3",
        "script": "ctx._source.rating = Math.max(ctx._source.rating, params.min_rating)",
        "params": {"min_rating": 3}
    }
]

update_results = bulk_update_documents(es, update_operations, "bulk_test", "document")
print(f"Updated {update_results['updated']} documents")

Bulk Deletions

def bulk_delete_documents(es, doc_ids, index, doc_type):
    """
    Perform bulk deletion of documents.
    
    Args:
        es (ES): ElasticSearch client instance
        doc_ids (list): List of document IDs to delete
        index (str): Target index name  
        doc_type (str): Document type
        
    Returns:
        dict: Deletion results
    """
    
    stats = {"deleted": 0, "not_found": 0, "errors": []}
    
    try:
        for doc_id in doc_ids:
            es.delete(index, doc_type, doc_id, bulk=True)
            stats["deleted"] += 1
        
        # Flush bulk deletions
        results = es.flush_bulk(forced=True)
        
        # Process results for detailed statistics
        if results and "items" in results:
            for item in results["items"]:
                if "delete" in item:
                    delete_result = item["delete"]
                    if delete_result.get("status") == 404:
                        stats["not_found"] += 1
                        stats["deleted"] -= 1
        
    except Exception as e:
        stats["errors"].append(str(e))
    
    return stats

# Bulk deletion example
doc_ids_to_delete = [str(i) for i in range(1000, 2000)]  # Delete docs 1000-1999
deletion_results = bulk_delete_documents(es, doc_ids_to_delete, "bulk_test", "document")

print(f"Deleted: {deletion_results['deleted']}")
print(f"Not found: {deletion_results['not_found']}")
print(f"Errors: {len(deletion_results['errors'])}")

Advanced Bulk Operations

Mixed Bulk Operations

def mixed_bulk_operations(es, operations):
    """
    Execute mixed bulk operations (index, update, delete) in a single batch.
    
    Args:
        es (ES): ElasticSearch client instance
        operations (list): List of mixed operation dictionaries
        
    Returns:
        dict: Operation results
    """
    
    stats = {
        "index_ops": 0,
        "update_ops": 0, 
        "delete_ops": 0,
        "errors": []
    }
    
    try:
        for op in operations:
            op_type = op["operation"]
            
            if op_type == "index":
                es.index(op["doc"], op["index"], op["type"], 
                        id=op.get("id"), bulk=True)
                stats["index_ops"] += 1
                
            elif op_type == "update":
                if "script" in op:
                    es.update(op["index"], op["type"], op["id"],
                             script=op["script"], 
                             params=op.get("params", {}),
                             bulk=True)
                else:
                    es.partial_update(op["index"], op["type"], op["id"],
                                    doc=op["doc"], bulk=True)
                stats["update_ops"] += 1
                
            elif op_type == "delete":
                es.delete(op["index"], op["type"], op["id"], bulk=True)
                stats["delete_ops"] += 1
        
        # Execute all operations
        results = es.flush_bulk(forced=True)
        
        # Process results for error handling
        if results and "errors" in results and results["errors"]:
            for item in results.get("items", []):
                for action, result in item.items():
                    if "error" in result:
                        stats["errors"].append({
                            "action": action,
                            "id": result.get("_id"),
                            "error": result["error"]
                        })
        
    except Exception as e:
        stats["errors"].append({"general_error": str(e)})
    
    return stats

# Mixed operations example
mixed_ops = [
    # Index new documents
    {
        "operation": "index",
        "index": "mixed_test",
        "type": "doc",
        "id": "new_1",
        "doc": {"title": "New Document 1", "status": "active"}
    },
    {
        "operation": "index", 
        "index": "mixed_test",
        "type": "doc",
        "id": "new_2",
        "doc": {"title": "New Document 2", "status": "active"}
    },
    
    # Update existing documents
    {
        "operation": "update",
        "index": "mixed_test",
        "type": "doc", 
        "id": "existing_1",
        "doc": {"last_updated": "2023-12-01", "status": "updated"}
    },
    {
        "operation": "update",
        "index": "mixed_test", 
        "type": "doc",
        "id": "existing_2",
        "script": "ctx._source.view_count += 1"
    },
    
    # Delete documents
    {
        "operation": "delete",
        "index": "mixed_test",
        "type": "doc",
        "id": "old_1"
    },
    {
        "operation": "delete", 
        "index": "mixed_test",
        "type": "doc",
        "id": "old_2"
    }
]

mixed_results = mixed_bulk_operations(es, mixed_ops)
print(f"Index operations: {mixed_results['index_ops']}")
print(f"Update operations: {mixed_results['update_ops']}")
print(f"Delete operations: {mixed_results['delete_ops']}")
print(f"Errors: {len(mixed_results['errors'])}")

Upsert Operations

def bulk_upsert_documents(es, upsert_ops, index, doc_type):
    """
    Perform bulk upsert operations (update if exists, create if not).
    
    Args:
        es (ES): ElasticSearch client instance
        upsert_ops (list): List of upsert operations
        index (str): Target index
        doc_type (str): Document type
        
    Returns:
        dict: Upsert results
    """
    
    stats = {"upserted": 0, "errors": []}
    
    try:
        for upsert_op in upsert_ops:
            doc_id = upsert_op["id"]
            
            # Use update with upsert
            es.update(
                index=index,
                doc_type=doc_type,
                id=doc_id,
                document=upsert_op.get("doc", {}),
                upsert=upsert_op.get("upsert", upsert_op.get("doc", {})),
                script=upsert_op.get("script"),
                params=upsert_op.get("params", {}),
                bulk=True
            )
            
            stats["upserted"] += 1
        
        # Flush upsert operations
        es.flush_bulk(forced=True)
        
    except Exception as e:
        stats["errors"].append(str(e))
    
    return stats

# Upsert examples
upsert_operations = [
    {
        "id": "user_123",
        "doc": {"name": "John Doe", "last_seen": "2023-12-01"},
        "upsert": {"name": "John Doe", "created": "2023-12-01", "last_seen": "2023-12-01"}
    },
    {
        "id": "user_456", 
        "script": "if (ctx._source.containsKey('visit_count')) { ctx._source.visit_count += 1 } else { ctx._source.visit_count = 1 }",
        "upsert": {"visit_count": 1, "created": "2023-12-01"}
    }
]

upsert_results = bulk_upsert_documents(es, upsert_operations, "users", "profile")

Bulk Operation Patterns

Streaming Bulk Processor

class StreamingBulkProcessor:
    """
    Streaming bulk processor for continuous data ingestion.
    
    Provides automatic batching, error handling, and performance monitoring
    for high-volume data streams.
    """
    
    def __init__(self, es_client, bulk_size=1000, flush_interval=30, 
                 max_retries=3, retry_delay=5):
        """
        Initialize StreamingBulkProcessor.
        
        Args:
            es_client (ES): ElasticSearch client
            bulk_size (int): Documents per batch. Default: 1000
            flush_interval (int): Auto-flush interval in seconds. Default: 30
            max_retries (int): Maximum retry attempts. Default: 3
            retry_delay (int): Delay between retries in seconds. Default: 5
        """
        self.es = es_client
        self.bulk_size = bulk_size
        self.flush_interval = flush_interval
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        
        self.buffer = []
        self.last_flush = time.time()
        self.stats = {
            "processed": 0,
            "errors": 0,
            "retries": 0,
            "flushes": 0
        }
    
    def add_document(self, doc, index, doc_type, doc_id=None, operation="index"):
        """
        Add document to processing buffer.
        
        Args:
            doc (dict): Document to process
            index (str): Target index
            doc_type (str): Document type
            doc_id (str, optional): Document ID
            operation (str): Operation type (index, update, delete). Default: "index"
        """
        
        self.buffer.append({
            "operation": operation,
            "index": index,
            "type": doc_type,
            "id": doc_id or str(uuid.uuid4()),
            "doc": doc
        })
        
        # Auto-flush if buffer is full or time interval exceeded
        if (len(self.buffer) >= self.bulk_size or 
            time.time() - self.last_flush > self.flush_interval):
            self.flush()
    
    def flush(self):
        """Flush buffered operations to ElasticSearch."""
        
        if not self.buffer:
            return
        
        retry_count = 0
        success = False
        
        while not success and retry_count <= self.max_retries:
            try:
                # Process buffer operations
                for op in self.buffer:
                    if op["operation"] == "index":
                        self.es.index(op["doc"], op["index"], op["type"], 
                                    id=op["id"], bulk=True)
                    elif op["operation"] == "update":
                        self.es.partial_update(op["index"], op["type"], 
                                             op["id"], doc=op["doc"], bulk=True)
                    elif op["operation"] == "delete":
                        self.es.delete(op["index"], op["type"], op["id"], bulk=True)
                
                # Execute bulk operations
                self.es.flush_bulk(forced=True)
                
                # Update statistics
                self.stats["processed"] += len(self.buffer)
                self.stats["flushes"] += 1
                
                # Clear buffer and update timestamp
                self.buffer.clear()
                self.last_flush = time.time()
                success = True
                
            except Exception as e:
                retry_count += 1
                self.stats["retries"] += 1
                
                if retry_count <= self.max_retries:
                    print(f"Bulk operation failed, retrying ({retry_count}/{self.max_retries}): {e}")
                    time.sleep(self.retry_delay)
                else:
                    self.stats["errors"] += len(self.buffer)
                    print(f"Bulk operation failed after {self.max_retries} retries: {e}")
                    # Could save failed operations to dead letter queue here
                    self.buffer.clear()
    
    def get_stats(self):
        """Get processor statistics."""
        return {
            **self.stats,
            "buffer_size": len(self.buffer),
            "last_flush": self.last_flush,
            "uptime": time.time() - getattr(self, 'start_time', time.time())
        }
    
    def close(self):
        """Flush remaining operations and close processor."""
        self.flush()

# Usage example
import uuid
import time
import json

processor = StreamingBulkProcessor(es, bulk_size=2000, flush_interval=60)

# Simulate continuous data stream
def simulate_data_stream():
    """Simulate continuous data ingestion."""
    
    for i in range(100000):
        doc = {
            "timestamp": time.time(),
            "event_type": f"event_{i % 10}",
            "user_id": f"user_{i % 1000}",
            "data": {"value": i, "category": f"cat_{i % 5}"}
        }
        
        processor.add_document(doc, "events", "event")
        
        # Simulate processing delay
        if i % 1000 == 0:
            stats = processor.get_stats()
            print(f"Processed: {stats['processed']}, Buffer: {stats['buffer_size']}")
            time.sleep(0.1)
    
    # Close processor
    processor.close()
    final_stats = processor.get_stats()
    print(f"Final stats: {final_stats}")

# Run simulation
simulate_data_stream()

Parallel Bulk Processing

import threading
import queue
import concurrent.futures

class ParallelBulkProcessor:
    """
    Parallel bulk processor for maximum throughput.
    
    Uses multiple threads to process bulk operations concurrently.
    """
    
    def __init__(self, es_client, num_workers=4, bulk_size=1000, queue_size=10000):
        """
        Initialize ParallelBulkProcessor.
        
        Args:
            es_client (ES): ElasticSearch client
            num_workers (int): Number of worker threads. Default: 4
            bulk_size (int): Documents per bulk request. Default: 1000
            queue_size (int): Maximum queue size. Default: 10000
        """
        self.es = es_client
        self.num_workers = num_workers
        self.bulk_size = bulk_size
        
        # Thread-safe queue for operations
        self.operation_queue = queue.Queue(maxsize=queue_size)
        self.result_queue = queue.Queue()
        
        # Worker management
        self.workers = []
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_workers)
        self.running = False
        
        # Statistics
        self.stats = {
            "queued": 0,
            "processed": 0,
            "errors": 0,
            "active_workers": 0
        }
    
    def start(self):
        """Start the parallel processor."""
        
        self.running = True
        
        # Start worker threads
        for i in range(self.num_workers):
            future = self.executor.submit(self._worker_loop, i)
            self.workers.append(future)
    
    def _worker_loop(self, worker_id):
        """Worker thread main loop."""
        
        batch = []
        
        while self.running or not self.operation_queue.empty():
            try:
                # Get operation from queue (with timeout)
                operation = self.operation_queue.get(timeout=1.0)
                batch.append(operation)
                
                # Process batch when full
                if len(batch) >= self.bulk_size:
                    self._process_batch(batch, worker_id)
                    batch = []
                
                self.operation_queue.task_done()
                
            except queue.Empty:
                # Process remaining batch if any
                if batch and not self.running:
                    self._process_batch(batch, worker_id) 
                    batch = []
                continue
            except Exception as e:
                self.stats["errors"] += 1
                print(f"Worker {worker_id} error: {e}")
        
        # Process final batch
        if batch:
            self._process_batch(batch, worker_id)
    
    def _process_batch(self, batch, worker_id):
        """Process a batch of operations."""
        
        try:
            self.stats["active_workers"] += 1
            
            # Create separate ES client for this worker to avoid conflicts
            worker_es = ES(self.es.server)
            worker_es.bulk_size = len(batch)
            
            for op in batch:
                if op["operation"] == "index":
                    worker_es.index(op["doc"], op["index"], op["type"],
                                  id=op["id"], bulk=True)
                elif op["operation"] == "update":
                    worker_es.partial_update(op["index"], op["type"], 
                                           op["id"], doc=op["doc"], bulk=True)
                elif op["operation"] == "delete":
                    worker_es.delete(op["index"], op["type"], op["id"], bulk=True)
            
            # Execute bulk operations
            results = worker_es.flush_bulk(forced=True)
            
            # Update statistics
            self.stats["processed"] += len(batch)
            
            # Queue results for monitoring
            self.result_queue.put({
                "worker_id": worker_id,
                "batch_size": len(batch),
                "success": True,
                "timestamp": time.time()
            })
            
        except Exception as e:
            self.stats["errors"] += len(batch)
            self.result_queue.put({
                "worker_id": worker_id,
                "batch_size": len(batch), 
                "success": False,
                "error": str(e),
                "timestamp": time.time()
            })
        finally:
            self.stats["active_workers"] -= 1
    
    def add_operation(self, operation, index, doc_type, doc_id=None, doc=None):
        """
        Add operation to processing queue.
        
        Args:
            operation (str): Operation type (index, update, delete)
            index (str): Target index
            doc_type (str): Document type
            doc_id (str, optional): Document ID
            doc (dict, optional): Document data
        """
        
        op = {
            "operation": operation,
            "index": index,
            "type": doc_type,
            "id": doc_id or str(uuid.uuid4()),
            "doc": doc
        }
        
        self.operation_queue.put(op)
        self.stats["queued"] += 1
    
    def stop(self, timeout=60):
        """
        Stop the processor and wait for completion.
        
        Args:
            timeout (int): Maximum wait time in seconds. Default: 60
        """
        
        # Signal workers to stop
        self.running = False
        
        # Wait for queue to empty
        self.operation_queue.join()
        
        # Shutdown executor
        self.executor.shutdown(wait=True, timeout=timeout)
    
    def get_stats(self):
        """Get current processing statistics."""
        return {
            **self.stats,
            "queue_size": self.operation_queue.qsize(),
            "result_queue_size": self.result_queue.qsize()
        }

# Usage example
def parallel_bulk_example():
    """Example of parallel bulk processing."""
    
    # Create parallel processor
    processor = ParallelBulkProcessor(es, num_workers=8, bulk_size=1500)
    
    # Start processing
    processor.start()
    
    try:
        # Add large number of operations
        for i in range(100000):
            doc = {
                "id": i,
                "title": f"Parallel Document {i}",
                "content": f"Content for parallel processing test {i}",
                "batch": i // 1000,
                "timestamp": time.time()
            }
            
            processor.add_operation("index", "parallel_test", "doc", str(i), doc)
            
            # Monitor progress
            if i % 10000 == 0:
                stats = processor.get_stats()
                print(f"Queued: {stats['queued']}, Processed: {stats['processed']}, "
                      f"Active Workers: {stats['active_workers']}")
    
    finally:
        # Stop processor and wait for completion
        processor.stop(timeout=120)
        
        final_stats = processor.get_stats()
        print(f"Final stats: {final_stats}")

# Run parallel processing example
parallel_bulk_example()

Error Handling and Recovery

Robust Bulk Error Handling

from pyes import BulkOperationException
import logging

class RobustBulkProcessor:
    """
    Bulk processor with comprehensive error handling and recovery.
    """
    
    def __init__(self, es_client, bulk_size=1000):
        self.es = es_client
        self.bulk_size = bulk_size
        self.logger = logging.getLogger("bulk_processor")
        
        # Error tracking
        self.error_stats = {
            "version_conflicts": 0,
            "document_missing": 0,
            "index_missing": 0,
            "mapping_errors": 0,
            "timeout_errors": 0,
            "other_errors": 0
        }
        
        # Failed operations for retry
        self.failed_operations = []
        self.dead_letter_queue = []
    
    def process_with_error_handling(self, operations):
        """
        Process operations with comprehensive error handling.
        
        Args:
            operations (list): List of bulk operations
            
        Returns:
            dict: Processing results with error details
        """
        
        results = {
            "successful": 0,
            "failed": 0,
            "errors": [],
            "retry_needed": []
        }
        
        try:
            # Execute bulk operations
            for op in operations:
                self._add_operation_to_bulk(op)
            
            bulk_results = self.es.flush_bulk(forced=True)
            
            # Process results for error handling
            if bulk_results and "items" in bulk_results:
                self._process_bulk_results(bulk_results["items"], results)
            
        except BulkOperationException as e:
            self.logger.error(f"Bulk operation exception: {e}")
            results["errors"].append({"type": "bulk_exception", "message": str(e)})
            
            # Handle specific bulk errors
            if hasattr(e, 'errors') and e.errors:
                for error_item in e.errors:
                    self._categorize_error(error_item, results)
        
        except Exception as e:
            self.logger.error(f"Unexpected error during bulk processing: {e}")
            results["errors"].append({"type": "unexpected", "message": str(e)})
        
        return results
    
    def _add_operation_to_bulk(self, op):
        """Add operation to bulk buffer."""
        
        if op["operation"] == "index":
            self.es.index(op["doc"], op["index"], op["type"], 
                         id=op["id"], bulk=True)
        elif op["operation"] == "update":
            self.es.partial_update(op["index"], op["type"], op["id"],
                                 doc=op["doc"], bulk=True)
        elif op["operation"] == "delete":
            self.es.delete(op["index"], op["type"], op["id"], bulk=True)
    
    def _process_bulk_results(self, items, results):
        """Process individual item results from bulk response."""
        
        for item in items:
            for action, result in item.items():
                if "error" in result:
                    # Operation failed
                    results["failed"] += 1
                    error_info = {
                        "action": action,
                        "id": result.get("_id"),
                        "status": result.get("status"),
                        "error": result["error"]
                    }
                    
                    # Categorize error for appropriate handling
                    if self._is_retryable_error(result):
                        results["retry_needed"].append(error_info)
                    else:
                        results["errors"].append(error_info)
                        self.dead_letter_queue.append(error_info)
                    
                    self._update_error_stats(result["error"])
                else:
                    # Operation successful
                    results["successful"] += 1
    
    def _is_retryable_error(self, result):
        """Determine if error is retryable."""
        
        error = result.get("error", {})
        error_type = error.get("type", "")
        status = result.get("status", 0)
        
        # Retryable conditions
        if status in [429, 503, 504]:  # Rate limited or service unavailable
            return True
        if error_type in ["timeout_exception", "connect_timeout_exception"]:
            return True
        if "circuit_breaking_exception" in error_type:
            return True
        
        return False
    
    def _categorize_error(self, error_item, results):
        """Categorize error for statistics and handling."""
        
        error = error_item.get("error", {})
        error_type = error.get("type", "")
        
        if "version_conflict" in error_type:
            self.error_stats["version_conflicts"] += 1
        elif "document_missing" in error_type:
            self.error_stats["document_missing"] += 1
        elif "index_not_found" in error_type:
            self.error_stats["index_missing"] += 1
        elif "mapper_parsing" in error_type:
            self.error_stats["mapping_errors"] += 1
        elif "timeout" in error_type:
            self.error_stats["timeout_errors"] += 1
        else:
            self.error_stats["other_errors"] += 1
    
    def _update_error_stats(self, error):
        """Update error statistics."""
        
        error_type = error.get("type", "")
        
        if "version_conflict" in error_type:
            self.error_stats["version_conflicts"] += 1
        elif "document_missing" in error_type:
            self.error_stats["document_missing"] += 1
        elif "index_not_found" in error_type:
            self.error_stats["index_missing"] += 1
        elif "mapper_parsing" in error_type:
            self.error_stats["mapping_errors"] += 1
        elif "timeout" in error_type:
            self.error_stats["timeout_errors"] += 1
        else:
            self.error_stats["other_errors"] += 1
    
    def retry_failed_operations(self, max_retries=3, retry_delay=5):
        """
        Retry operations that failed with retryable errors.
        
        Args:
            max_retries (int): Maximum retry attempts. Default: 3
            retry_delay (int): Delay between retries in seconds. Default: 5
            
        Returns:
            dict: Retry results
        """
        
        retry_results = {"successful_retries": 0, "permanent_failures": 0}
        
        for attempt in range(max_retries):
            if not self.failed_operations:
                break
            
            self.logger.info(f"Retry attempt {attempt + 1} for {len(self.failed_operations)} operations")
            
            # Retry failed operations
            retry_batch = self.failed_operations.copy()
            self.failed_operations.clear()
            
            results = self.process_with_error_handling(retry_batch)
            
            retry_results["successful_retries"] += results["successful"]
            
            # Failed retries go back to failed_operations
            self.failed_operations.extend(results["retry_needed"])
            
            if not self.failed_operations:
                break
            
            time.sleep(retry_delay)
        
        # Move permanently failed operations to dead letter queue
        retry_results["permanent_failures"] = len(self.failed_operations)
        self.dead_letter_queue.extend(self.failed_operations)
        self.failed_operations.clear()
        
        return retry_results
    
    def get_error_summary(self):
        """Get comprehensive error summary."""
        
        return {
            "error_stats": self.error_stats,
            "failed_operations": len(self.failed_operations),
            "dead_letter_queue": len(self.dead_letter_queue),
            "total_errors": sum(self.error_stats.values())
        }

# Usage example with error handling
def robust_bulk_processing_example():
    """Example of robust bulk processing with error handling."""
    
    processor = RobustBulkProcessor(es, bulk_size=1000)
    
    # Create problematic operations to test error handling
    operations = []
    
    for i in range(5000):
        # Mix of good and problematic operations
        if i % 100 == 0:
            # Version conflict (trying to update non-existent doc with version)
            operations.append({
                "operation": "update",
                "index": "test_index",
                "type": "doc", 
                "id": f"conflict_{i}",
                "doc": {"field": "value"},
                "version": 99  # Will cause version conflict
            })
        elif i % 150 == 0:
            # Invalid document (missing required field)
            operations.append({
                "operation": "index",
                "index": "strict_index",  # Index with strict mapping
                "type": "doc",
                "id": str(i),
                "doc": {"invalid_field": "value"}  # Will cause mapping error
            })
        else:
            # Valid operation
            operations.append({
                "operation": "index",
                "index": "test_index",
                "type": "doc", 
                "id": str(i),
                "doc": {"title": f"Document {i}", "content": f"Content {i}"}
            })
    
    # Process with error handling
    results = processor.process_with_error_handling(operations)
    
    print(f"Successful operations: {results['successful']}")
    print(f"Failed operations: {results['failed']}")
    print(f"Operations needing retry: {len(results['retry_needed'])}")
    
    # Retry failed operations
    retry_results = processor.retry_failed_operations()
    print(f"Successful retries: {retry_results['successful_retries']}")
    print(f"Permanent failures: {retry_results['permanent_failures']}")
    
    # Error summary
    error_summary = processor.get_error_summary()
    print(f"Error summary: {error_summary}")

# Run robust processing example
robust_bulk_processing_example()

Performance Optimization

Bulk Performance Tuning

def optimize_bulk_performance():
    """Comprehensive bulk performance optimization strategies."""
    
    # 1. Optimal bulk size calculation
    def calculate_optimal_bulk_size(avg_doc_size_kb, available_memory_mb, 
                                  network_latency_ms):
        """Calculate optimal bulk size based on system characteristics."""
        
        # Target: 5-15MB per bulk request
        target_bulk_mb = 10
        
        # Calculate based on document size
        docs_per_mb = 1024 / avg_doc_size_kb
        base_bulk_size = int(target_bulk_mb * docs_per_mb)
        
        # Adjust for available memory (use max 10% for bulk buffer)
        memory_limit = (available_memory_mb * 0.1 * 1024) / avg_doc_size_kb
        memory_limited_size = int(min(base_bulk_size, memory_limit))
        
        # Adjust for network latency
        if network_latency_ms > 100:
            # High latency: larger bulks to amortize network cost
            latency_adjusted = min(memory_limited_size * 2, 10000)
        elif network_latency_ms < 20:
            # Low latency: smaller bulks for faster feedback
            latency_adjusted = max(memory_limited_size // 2, 500)
        else:
            latency_adjusted = memory_limited_size
        
        return max(100, min(latency_adjusted, 10000))  # Reasonable bounds
    
    # 2. Connection optimization
    def optimize_es_connection():
        """Optimize ElasticSearch connection for bulk operations."""
        
        optimized_es = ES(
            server=["es1.example.com:9200", "es2.example.com:9200"],  # Multiple nodes
            timeout=120.0,        # Longer timeout for bulk operations
            max_retries=3,        # Retry failed requests
            retry_time=30,        # Wait between retries
            bulk_size=5000,       # Optimized bulk size
            # Connection pooling (implementation-specific)
            connection_pool_size=10,
            connection_keep_alive=True
        )
        
        return optimized_es
    
    # 3. Document preparation optimization
    def prepare_documents_efficiently(raw_docs):
        """Efficiently prepare documents for bulk indexing."""
        
        prepared_docs = []
        
        # Batch process documents
        for doc in raw_docs:
            # Minimize document size
            optimized_doc = {
                # Only include necessary fields
                k: v for k, v in doc.items() 
                if v is not None and v != "" and k != "_internal"
            }
            
            # Optimize field values
            if "timestamp" in optimized_doc:
                # Use epoch time instead of ISO string (smaller)
                optimized_doc["timestamp"] = int(optimized_doc["timestamp"])
            
            # Compress large text fields if beneficial
            if "content" in optimized_doc and len(optimized_doc["content"]) > 1000:
                # Could implement compression here
                pass
            
            prepared_docs.append(optimized_doc)
        
        return prepared_docs
    
    # 4. Memory management
    def memory_efficient_bulk_processing(documents, es_client):
        """Process documents with memory efficiency."""
        
        import gc
        
        batch_size = 10000  # Process in memory-friendly batches
        total_processed = 0
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # Process batch
            for doc in batch:
                es_client.index(doc, "optimized_index", "doc", bulk=True)
            
            # Flush and cleanup
            es_client.flush_bulk(forced=True)
            total_processed += len(batch)
            
            # Force garbage collection to free memory
            if total_processed % (batch_size * 5) == 0:
                gc.collect()
            
            print(f"Processed {total_processed}/{len(documents)} documents")
        
        return total_processed
    
    # 5. Index optimization for bulk operations
    def optimize_index_for_bulk_operations(es_client, index_name):
        """Optimize index settings for bulk operations."""
        
        # Temporarily disable refresh for faster indexing
        es_client.indices.update_settings(index_name, {
            "refresh_interval": "-1",           # Disable auto-refresh
            "number_of_replicas": 0,           # Reduce replicas during bulk load
            "translog.flush_threshold_size": "1gb",  # Larger translog
            "merge.policy.max_merged_segment": "5gb"  # Larger segments
        })
        
        return {
            "refresh_interval": "1s",          # Original settings to restore
            "number_of_replicas": 1,
            "translog.flush_threshold_size": "512mb",
            "merge.policy.max_merged_segment": "5gb"
        }
    
    # 6. Post-bulk optimization
    def post_bulk_optimization(es_client, index_name, original_settings):
        """Restore optimal settings after bulk operations."""
        
        # Restore original settings
        es_client.indices.update_settings(index_name, original_settings)
        
        # Force refresh to make documents searchable
        es_client.indices.refresh(index_name)
        
        # Force merge to optimize segments
        es_client.indices.optimize(index_name, max_num_segments=1)
    
    return {
        "calculate_bulk_size": calculate_optimal_bulk_size,
        "optimize_connection": optimize_es_connection,
        "prepare_documents": prepare_documents_efficiently,
        "memory_efficient_processing": memory_efficient_bulk_processing,
        "optimize_index": optimize_index_for_bulk_operations,
        "post_optimization": post_bulk_optimization
    }

# Example of comprehensive bulk optimization
def optimized_bulk_pipeline():
    """Complete optimized bulk processing pipeline."""
    
    # Get optimization functions
    optimizers = optimize_bulk_performance()
    
    # Calculate optimal bulk size (example values)
    optimal_size = optimizers["calculate_bulk_size"](
        avg_doc_size_kb=2.5,      # 2.5KB average document size
        available_memory_mb=8192,  # 8GB available memory
        network_latency_ms=50     # 50ms network latency
    )
    print(f"Calculated optimal bulk size: {optimal_size}")
    
    # Optimize ES connection
    optimized_es = optimizers["optimize_connection"]()
    optimized_es.bulk_size = optimal_size
    
    # Prepare index for bulk operations
    index_name = "optimized_bulk_index"
    original_settings = optimizers["optimize_index"](optimized_es, index_name)
    
    try:
        # Generate and prepare documents
        raw_documents = [
            {"id": i, "title": f"Optimized Document {i}", 
             "content": f"Optimized content for document {i}" * 10,
             "timestamp": time.time() + i}
            for i in range(100000)
        ]
        
        prepared_docs = optimizers["prepare_documents"](raw_documents)
        
        # Process with memory efficiency
        processed_count = optimizers["memory_efficient_processing"](
            prepared_docs, optimized_es
        )
        
        print(f"Successfully processed {processed_count} documents")
        
    finally:
        # Restore optimal settings
        optimizers["post_optimization"](optimized_es, index_name, original_settings)

# Run optimized pipeline
optimized_bulk_pipeline()

PyES bulk operations provide powerful capabilities for high-performance data processing, with comprehensive support for error handling, parallel processing, and performance optimization to handle large-scale data ingestion efficiently.

Install with Tessl CLI

npx tessl i tessl/pypi-pyes

docs

bulk-operations.md

client.md

facets-aggregations.md

filters.md

index.md

mappings.md

query-dsl.md

rivers.md

tile.json