CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch5

Python client for Elasticsearch 5.x providing comprehensive access to all Elasticsearch APIs and features.

Pending
Overview
Eval results
Files

bulk-operations.mddocs/

Bulk Operations

High-performance operations for processing multiple documents efficiently. Includes bulk indexing, updates, deletions, and specialized helper functions for streaming and parallel processing of large datasets.

Capabilities

Bulk Document Operations

Execute multiple document operations (index, create, update, delete) in a single request for improved performance.

def bulk(body: list, index: str = None, doc_type: str = None, **params) -> dict:
    """
    Execute multiple document operations in a single request.
    
    Parameters:
    - body: List of operations (action/metadata and optional source)
    - index: Default index for operations without explicit index
    - doc_type: Default document type
    - pipeline: Default ingest pipeline
    - refresh: Control when changes are visible ('true', 'false', 'wait_for')
    - routing: Default routing value
    - timeout: Request timeout
    - wait_for_active_shards: Wait for N shards to be active
    
    Body format (list of alternating action lines and document lines):
    [
        {"index": {"_index": "my_index", "_type": "_doc", "_id": "1"}},
        {"title": "Document 1", "content": "Content here"},
        {"create": {"_index": "my_index", "_type": "_doc", "_id": "2"}},
        {"title": "Document 2", "content": "More content"},
        {"update": {"_index": "my_index", "_type": "_doc", "_id": "3"}},
        {"doc": {"title": "Updated Document 3"}},
        {"delete": {"_index": "my_index", "_type": "_doc", "_id": "4"}}
    ]
    
    Returns:
    dict: Bulk response with 'items' array containing results for each operation
    """

Update by Query

Update multiple documents matching a query using scripts or partial updates.

def update_by_query(index: str, doc_type: str = None, body: dict = None, **params) -> dict:
    """
    Update documents matching a query.
    
    Parameters:
    - index: Index name(s) to update
    - doc_type: Document type(s)
    - body: Update specification with query and script
    - _source: Fields to include in response
    - _source_excludes: Fields to exclude
    - _source_includes: Fields to include
    - allow_no_indices: Handle missing indices
    - analyzer: Query analyzer
    - analyze_wildcard: Analyze wildcards
    - conflicts: How to handle version conflicts ('abort', 'proceed')
    - default_operator: Default query operator
    - df: Default field
    - expand_wildcards: Wildcard expansion
    - from_: Starting document
    - ignore_unavailable: Ignore unavailable indices
    - lenient: Ignore query failures
    - pipeline: Ingest pipeline for updated documents
    - preference: Node preference
    - q: Query string
    - refresh: Refresh after operation
    - request_cache: Use request cache
    - requests_per_second: Throttling rate (operations per second)
    - routing: Routing values
    - scroll: Scroll timeout for large updates
    - scroll_size: Scroll batch size
    - search_type: Search type
    - search_timeout: Search timeout
    - size: Maximum documents to update
    - slices: Number of slices for parallel processing
    - sort: Sort specification
    - terminate_after: Terminate after N documents
    - timeout: Request timeout
    - version: Include versions in response
    - version_type: Version type
    - wait_for_active_shards: Wait for shards
    - wait_for_completion: Wait for completion or return task
    
    Body structure:
    {
        "query": {
            "term": {"status": "draft"}
        },
        "script": {
            "source": "ctx._source.status = 'published'; ctx._source.published_at = params.now",
            "params": {"now": "2023-01-01T12:00:00Z"}
        }
    }
    
    Returns:
    dict: Update results with 'updated', 'version_conflicts', 'took', etc.
    """

Delete by Query

Delete multiple documents matching a query efficiently.

def delete_by_query(index: str, body: dict, doc_type: str = None, **params) -> dict:
    """
    Delete documents matching a query.
    
    Parameters:
    - index: Index name(s) to delete from
    - body: Delete specification with query
    - doc_type: Document type(s)
    - _source: Fields to include in response
    - _source_excludes: Fields to exclude
    - _source_includes: Fields to include
    - allow_no_indices: Handle missing indices
    - analyzer: Query analyzer
    - analyze_wildcard: Analyze wildcards
    - conflicts: How to handle conflicts ('abort', 'proceed')
    - default_operator: Default query operator
    - df: Default field
    - expand_wildcards: Wildcard expansion
    - from_: Starting document
    - ignore_unavailable: Ignore unavailable indices
    - lenient: Ignore query failures
    - preference: Node preference
    - q: Query string
    - refresh: Refresh after operation
    - request_cache: Use request cache
    - requests_per_second: Throttling rate
    - routing: Routing values
    - scroll: Scroll timeout
    - scroll_size: Scroll batch size
    - search_type: Search type
    - search_timeout: Search timeout
    - size: Maximum documents to delete
    - slices: Number of slices for parallel processing
    - sort: Sort specification
    - terminate_after: Terminate after N documents
    - timeout: Request timeout
    - version: Include versions
    - wait_for_active_shards: Wait for shards
    - wait_for_completion: Wait for completion
    
    Body structure:
    {
        "query": {
            "range": {
                "created_at": {
                    "lt": "2022-01-01"
                }
            }
        }
    }
    
    Returns:
    dict: Deletion results with 'deleted', 'took', 'version_conflicts', etc.
    """

Reindex Operations

Copy and transform documents between indices with optional query filtering and script processing.

def reindex(body: dict, **params) -> dict:
    """
    Copy documents from source to destination index with optional transformation.
    
    Parameters:
    - body: Reindex specification with source and destination
    - refresh: Refresh destination index after operation
    - requests_per_second: Throttling rate
    - slices: Number of slices for parallel processing
    - timeout: Request timeout
    - wait_for_active_shards: Wait for shards
    - wait_for_completion: Wait for completion or return task
    
    Body structure:
    {
        "source": {
            "index": "source_index",
            "type": "_doc",                    # Optional
            "query": {                         # Optional filtering
                "term": {"status": "published"}
            },
            "_source": ["title", "content"],   # Optional field filtering
            "size": 1000                       # Batch size
        },
        "dest": {
            "index": "destination_index",
            "type": "_doc",                    # Optional
            "pipeline": "my_pipeline"          # Optional ingest pipeline
        },
        "script": {                            # Optional transformation
            "source": "ctx._source.new_field = ctx._source.old_field + '_transformed'"
        },
        "conflicts": "proceed"                 # Handle version conflicts
    }
    
    Returns:
    dict: Reindex results with 'created', 'updated', 'took', etc.
    """

def reindex_rethrottle(task_id: str = None, **params) -> dict:
    """
    Change throttling of a running reindex task.
    
    Parameters:
    - task_id: Task identifier from reindex operation
    - requests_per_second: New throttling rate
    
    Returns:
    dict: Updated task information
    """

Helper Functions

High-level helper utilities for common bulk operations with automatic batching, error handling, and progress tracking.

from elasticsearch5.helpers import bulk, streaming_bulk, parallel_bulk, scan, reindex

def bulk(client, actions, stats_only: bool = False, **kwargs) -> tuple:
    """
    Helper for bulk operations with automatic batching and error handling.
    
    Parameters:
    - client: Elasticsearch client instance
    - actions: Iterable of document actions
    - stats_only: Return only success count (bool)
    - index: Default index name
    - doc_type: Default document type
    - chunk_size: Documents per batch (default 500)
    - max_chunk_bytes: Maximum batch size in bytes
    - thread_count: Number of threads for parallel processing
    - queue_size: Queue size for threading
    - expand_action_callback: Function to transform actions
    - refresh: Refresh after operation
    - request_timeout: Request timeout
    - max_retries: Maximum retry attempts
    - initial_backoff: Initial retry delay
    - max_backoff: Maximum retry delay
    
    Action format:
    {
        "_op_type": "index",  # or "create", "update", "delete"
        "_index": "my_index",
        "_type": "_doc",
        "_id": "document_id",
        "_source": {"field": "value"}  # For index/create
    }
    
    Returns:
    If stats_only=False: (success_count, list_of_errors)
    If stats_only=True: success_count
    
    Raises:
    BulkIndexError: If errors occurred and not ignored
    """

def streaming_bulk(client, actions, chunk_size: int = 500, **kwargs):
    """
    Generator for streaming bulk operations.
    
    Parameters: Same as bulk()
    
    Yields:
    (is_success: bool, action_result: dict) for each action
    """

def parallel_bulk(client, actions, thread_count: int = 4, **kwargs):
    """
    Generator for parallel bulk operations using multiple threads.
    
    Parameters: Same as bulk() plus thread_count
    
    Yields:
    (is_success: bool, action_result: dict) for each action
    """

def scan(client, query: dict = None, scroll: str = '5m', **kwargs):
    """
    Generator to efficiently scroll through all matching documents.
    
    Parameters:
    - client: Elasticsearch client
    - query: Search query (default match_all)
    - scroll: Scroll timeout
    - index: Index name(s)
    - doc_type: Document type(s)
    - raise_on_error: Raise on scroll errors
    - preserve_order: Maintain result ordering
    - size: Documents per scroll batch
    - request_timeout: Request timeout
    - clear_scroll: Clear scroll on completion
    
    Yields:
    Individual documents from search results
    """

def reindex(client, source_index: str, target_index: str, query: dict = None, **kwargs) -> tuple:
    """
    Helper to reindex documents between indices.
    
    Parameters:
    - client: Elasticsearch client
    - source_index: Source index name
    - target_index: Target index name
    - query: Optional query to filter documents
    - target_client: Different client for target (for cross-cluster)
    - chunk_size: Documents per batch
    - scroll: Scroll timeout
    - scan_kwargs: Additional scan() parameters
    - bulk_kwargs: Additional bulk() parameters
    
    Returns:
    (success_count, list_of_errors)
    """

Helper Exceptions

from elasticsearch5.helpers import BulkIndexError, ScanError

class BulkIndexError(Exception):
    """
    Exception for bulk operation failures.
    
    Attributes:
    - errors: List of failed actions with error details
    """

class ScanError(Exception):
    """
    Exception for scan operation failures.
    
    Attributes:
    - scroll_id: Scroll ID for potential resume
    """

Usage Examples

Basic Bulk Operations

from elasticsearch5 import Elasticsearch

es = Elasticsearch(['localhost:9200'])

# Prepare bulk operations
actions = [
    {
        "_op_type": "index",
        "_index": "articles",
        "_type": "_doc",
        "_id": "1",
        "_source": {"title": "Article 1", "content": "Content 1"}
    },
    {
        "_op_type": "create",
        "_index": "articles", 
        "_type": "_doc",
        "_id": "2",
        "_source": {"title": "Article 2", "content": "Content 2"}
    },
    {
        "_op_type": "update",
        "_index": "articles",
        "_type": "_doc", 
        "_id": "3",
        "_source": {"doc": {"title": "Updated Article 3"}}
    },
    {
        "_op_type": "delete",
        "_index": "articles",
        "_type": "_doc",
        "_id": "4"
    }
]

# Execute bulk operations
response = es.bulk(body=actions)

# Check for errors
if response['errors']:
    for item in response['items']:
        for operation, result in item.items():
            if 'error' in result:
                print(f"Error in {operation}: {result['error']}")
else:
    print(f"Successfully processed {len(response['items'])} operations")

Using Helper Functions

from elasticsearch5.helpers import bulk, scan, reindex

# Generate documents
def generate_docs():
    for i in range(10000):
        yield {
            "_index": "large_index",
            "_type": "_doc",
            "_id": str(i),
            "_source": {
                "title": f"Document {i}",
                "content": f"This is content for document {i}",
                "category": "bulk_test"
            }
        }

# Bulk index with helper
try:
    success, failed = bulk(es, generate_docs(), chunk_size=1000)
    print(f"Successfully indexed {success} documents")
    if failed:
        print(f"Failed to index {len(failed)} documents")
except BulkIndexError as e:
    print(f"Bulk indexing failed: {e}")
    for error in e.errors:
        print(f"Error: {error}")

Streaming Bulk Processing

from elasticsearch5.helpers import streaming_bulk

# Process large dataset with streaming
def process_large_dataset():
    for is_success, result in streaming_bulk(es, generate_docs(), chunk_size=500):
        if not is_success:
            print(f"Failed to process: {result}")
        else:
            # Process successful result
            pass

process_large_dataset()

Update by Query

# Update all draft articles to published
update_body = {
    "query": {
        "term": {"status": "draft"}
    },
    "script": {
        "source": """
            ctx._source.status = 'published';
            ctx._source.published_at = params.now;
            ctx._source.publish_count = (ctx._source.publish_count ?: 0) + 1
        """,
        "params": {
            "now": "2023-01-01T12:00:00Z"
        }
    }
}

response = es.update_by_query(
    index='articles',
    body=update_body,
    conflicts='proceed',  # Continue on version conflicts
    refresh=True
)

print(f"Updated {response['updated']} documents")
print(f"Version conflicts: {response.get('version_conflicts', 0)}")

Reindex with Transformation

# Reindex with field transformation and filtering
reindex_body = {
    "source": {
        "index": "old_articles",
        "query": {
            "range": {
                "created_at": {
                    "gte": "2022-01-01"
                }
            }
        }
    },
    "dest": {
        "index": "new_articles",
        "pipeline": "article_enrichment"
    },
    "script": {
        "source": """
            // Transform old format to new format
            ctx._source.slug = ctx._source.title.toLowerCase().replaceAll('[^a-z0-9]+', '-');
            ctx._source.word_count = ctx._source.content.split(' ').length;
            ctx._source.migrated_at = '2023-01-01T00:00:00Z';
        """
    }
}

response = es.reindex(
    body=reindex_body,
    wait_for_completion=False,  # Run as task
    slices='auto'  # Parallel processing
)

if 'task' in response:
    task_id = response['task']
    print(f"Reindex started as task: {task_id}")
    
    # Check task status later
    task_status = es.tasks.get(task_id=task_id)
    print(f"Task status: {task_status}")

Scan and Process Large Datasets

from elasticsearch5.helpers import scan

# Process all documents in an index
query = {
    "query": {
        "range": {
            "created_at": {
                "gte": "2023-01-01"
            }
        }
    }
}

# Scan through all matching documents
processed_count = 0
for doc in scan(es, query=query, index='large_index', size=1000):
    # Process each document
    process_document(doc['_source'])
    processed_count += 1
    
    if processed_count % 10000 == 0:
        print(f"Processed {processed_count} documents")

print(f"Total processed: {processed_count} documents")

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch5

docs

bulk-operations.md

cluster-operations.md

document-operations.md

index-management.md

index.md

search-operations.md

transport-connection.md

tile.json