CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch

Python client for Elasticsearch with comprehensive API coverage and both sync and async support

Pending
Overview
Eval results
Files

helper-functions.mddocs/

Helper Functions

Utility functions for common Elasticsearch operations including bulk indexing, scanning large result sets, and reindexing data. These helpers simplify complex operations and provide optimized implementations for common use cases.

Capabilities

Bulk Operations

Efficient bulk indexing with automatic batching, error handling, and progress tracking.

def bulk(
    client,
    actions,
    index: Optional[str] = None,
    doc_type: Optional[str] = None,
    routing: Optional[str] = None,
    pipeline: Optional[str] = None,
    refresh: Optional[str] = None,
    timeout: Optional[str] = None,
    chunk_size: int = 500,
    max_chunk_bytes: int = 104857600,
    thread_count: int = 4,
    queue_size: int = 4,
    expand_action_callback=None,
    raise_on_exception: bool = True,
    raise_on_error: bool = True,
    ignore_status=(),
    **kwargs
) -> Tuple[int, List[Dict]]:
    """
    Perform bulk indexing operations.
    
    Parameters:
    - client: Elasticsearch client instance
    - actions: Iterable of action dictionaries or documents
    - index: Default index name for actions
    - doc_type: Default document type (deprecated)
    - routing: Default routing value
    - pipeline: Default ingest pipeline
    - refresh: Refresh policy for operations
    - timeout: Request timeout
    - chunk_size: Number of documents per chunk
    - max_chunk_bytes: Maximum chunk size in bytes
    - thread_count: Number of parallel threads
    - queue_size: Thread pool queue size
    - expand_action_callback: Callback to expand actions
    - raise_on_exception: Whether to raise on exceptions
    - raise_on_error: Whether to raise on API errors
    - ignore_status: HTTP status codes to ignore
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

def streaming_bulk(
    client,
    actions,
    index: Optional[str] = None,
    doc_type: Optional[str] = None,
    routing: Optional[str] = None,
    pipeline: Optional[str] = None,
    refresh: Optional[str] = None,
    timeout: Optional[str] = None,
    chunk_size: int = 500,
    max_chunk_bytes: int = 104857600,
    expand_action_callback=None,
    raise_on_exception: bool = True,
    raise_on_error: bool = True,
    ignore_status=(),
    **kwargs
):
    """
    Generator that yields bulk operation results as they complete.
    
    Parameters: Same as bulk()
    
    Yields:
    Tuples of (success, info) for each chunk processed
    """

def parallel_bulk(
    client,
    actions,
    index: Optional[str] = None,
    doc_type: Optional[str] = None,
    routing: Optional[str] = None,
    pipeline: Optional[str] = None,
    refresh: Optional[str] = None,
    timeout: Optional[str] = None,
    chunk_size: int = 500,
    max_chunk_bytes: int = 104857600,
    thread_count: int = 4,
    queue_size: int = 4,
    expand_action_callback=None,
    ignore_status=(),
    **kwargs
):
    """
    Parallel bulk indexing using multiple threads.
    
    Parameters: Same as bulk() with additional thread control
    
    Yields:
    Tuples of (success, info) for each chunk processed
    """

Scanning Operations

Efficiently iterate through large result sets using scroll API.

def scan(
    client,
    query: Optional[Dict[str, Any]] = None,
    scroll: str = "5m",
    raise_on_error: bool = True,
    preserve_order: bool = False,
    size: int = 1000,
    request_timeout: Optional[float] = None,
    clear_scroll: bool = True,
    scroll_kwargs: Optional[Dict] = None,
    **kwargs
):
    """
    Scan and scroll through all documents matching a query.
    
    Parameters:
    - client: Elasticsearch client instance
    - query: Query to execute (default: match_all)
    - scroll: Scroll context timeout
    - raise_on_error: Whether to raise on errors
    - preserve_order: Whether to preserve result ordering
    - size: Number of documents per shard per batch
    - request_timeout: Request timeout
    - clear_scroll: Whether to clear scroll context when done
    - scroll_kwargs: Additional arguments for scroll requests
    - **kwargs: Additional search parameters
    
    Yields:
    Individual document hits
    """

Reindexing Operations

Copy documents between indices with optional transformation.

def reindex(
    client,
    source_index: str,
    target_index: str,
    query: Optional[Dict[str, Any]] = None,
    target_client: Optional[object] = None,
    chunk_size: int = 500,
    scroll: str = "5m",
    scan_kwargs: Optional[Dict] = None,
    bulk_kwargs: Optional[Dict] = None,
    transform_doc_callback=None,
    **kwargs
) -> Tuple[int, List[Dict]]:
    """
    Reindex documents from source to target index.
    
    Parameters:
    - client: Source Elasticsearch client
    - source_index: Source index name
    - target_index: Target index name
    - query: Query to filter source documents
    - target_client: Target client (if different from source)
    - chunk_size: Bulk operation chunk size
    - scroll: Scroll timeout for scanning
    - scan_kwargs: Additional scan arguments
    - bulk_kwargs: Additional bulk arguments
    - transform_doc_callback: Function to transform documents
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

Asynchronous Helper Functions

Async versions of helper functions for use with AsyncElasticsearch.

async def async_bulk(
    client,
    actions,
    index: Optional[str] = None,
    doc_type: Optional[str] = None,
    routing: Optional[str] = None,
    pipeline: Optional[str] = None,
    refresh: Optional[str] = None,
    timeout: Optional[str] = None,
    chunk_size: int = 500,
    max_chunk_bytes: int = 104857600,
    expand_action_callback=None,
    raise_on_exception: bool = True,
    raise_on_error: bool = True,
    ignore_status=(),
    **kwargs
) -> Tuple[int, List[Dict]]:
    """
    Async version of bulk operation.
    
    Parameters: Same as bulk()
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

async def async_streaming_bulk(
    client,
    actions,
    index: Optional[str] = None,
    doc_type: Optional[str] = None,
    routing: Optional[str] = None,
    pipeline: Optional[str] = None,
    refresh: Optional[str] = None,
    timeout: Optional[str] = None,
    chunk_size: int = 500,
    max_chunk_bytes: int = 104857600,
    expand_action_callback=None,
    raise_on_exception: bool = True,
    raise_on_error: bool = True,
    ignore_status=(),
    **kwargs
):
    """
    Async generator for streaming bulk operations.
    
    Parameters: Same as streaming_bulk()
    
    Yields:
    Tuples of (success, info) for each chunk processed
    """

async def async_scan(
    client,
    query: Optional[Dict[str, Any]] = None,
    scroll: str = "5m",
    raise_on_error: bool = True,
    preserve_order: bool = False,
    size: int = 1000,
    request_timeout: Optional[float] = None,
    clear_scroll: bool = True,
    scroll_kwargs: Optional[Dict] = None,
    **kwargs
):
    """
    Async version of scan operation.
    
    Parameters: Same as scan()
    
    Yields:
    Individual document hits
    """

async def async_reindex(
    client,
    source_index: str,
    target_index: str,
    query: Optional[Dict[str, Any]] = None,
    target_client: Optional[object] = None,
    chunk_size: int = 500,
    scroll: str = "5m",
    scan_kwargs: Optional[Dict] = None,
    bulk_kwargs: Optional[Dict] = None,
    transform_doc_callback=None,
    **kwargs
) -> Tuple[int, List[Dict]]:
    """
    Async version of reindex operation.
    
    Parameters: Same as reindex()
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

Utility Functions

Additional utility functions for common operations.

def expand_action(action: Dict[str, Any]) -> Dict[str, Any]:
    """
    Expand a shorthand action dictionary to full format.
    
    Parameters:
    - action: Action dictionary to expand
    
    Returns:
    Expanded action dictionary
    """

Helper Exceptions

Exception types specific to helper operations.

class BulkIndexError(Exception):
    """
    Exception raised when bulk operations encounter errors.
    
    Attributes:
    - errors: List of individual operation errors
    """
    def __init__(self, message: str, errors: List[Dict]): ...
    @property
    def errors(self) -> List[Dict]: ...

class ScanError(Exception):
    """
    Exception raised during scan operations.
    
    Attributes:
    - scroll_id: Scroll context ID if available
    - partial_results: Results obtained before error
    """
    def __init__(self, message: str, scroll_id: Optional[str] = None): ...

Usage Examples

Bulk Indexing

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError

client = Elasticsearch(hosts=['http://localhost:9200'])

# Prepare documents for bulk indexing
def generate_docs():
    for i in range(10000):
        yield {
            "_index": "products",
            "_id": i,
            "_source": {
                "name": f"Product {i}",
                "price": i * 10.99,
                "category": f"Category {i % 5}"
            }
        }

try:
    # Bulk index documents
    success_count, failed_ops = bulk(
        client,
        generate_docs(),
        chunk_size=1000,
        timeout='60s',
        refresh='wait_for'
    )
    
    print(f"Successfully indexed: {success_count} documents")
    if failed_ops:
        print(f"Failed operations: {len(failed_ops)}")
        
except BulkIndexError as e:
    print(f"Bulk indexing failed: {e}")
    for error in e.errors:
        print(f"Error: {error}")

Streaming Bulk Operations

from elasticsearch.helpers import streaming_bulk

def generate_large_dataset():
    for i in range(100000):
        yield {
            "_index": "logs",
            "_source": {
                "timestamp": f"2024-01-01T{i:02d}:00:00Z",
                "level": "INFO" if i % 2 == 0 else "WARN",
                "message": f"Log message {i}",
                "service": f"service-{i % 10}"
            }
        }

# Stream bulk operations with progress tracking
total_indexed = 0
for success, info in streaming_bulk(
    client,
    generate_large_dataset(),
    chunk_size=5000,
    max_chunk_bytes=10 * 1024 * 1024  # 10MB chunks
):
    if success:
        total_indexed += 1
        if total_indexed % 10 == 0:  # Progress every 10 chunks
            print(f"Indexed {total_indexed * 5000} documents...")
    else:
        print(f"Failed chunk: {info}")

print(f"Total documents indexed: {total_indexed * 5000}")

Scanning Large Result Sets

from elasticsearch.helpers import scan

# Scan through all documents in an index
total_docs = 0
for doc in scan(
    client,
    index="products",
    query={"match_all": {}},
    scroll='2m',
    size=1000
):
    # Process each document
    total_docs += 1
    product_name = doc['_source']['name']
    product_price = doc['_source']['price']
    
    # Example: Update price with 10% discount
    if product_price > 100:
        client.update(
            index="products",
            id=doc['_id'],
            document={"price": product_price * 0.9}
        )

print(f"Processed {total_docs} documents")

Advanced Scanning with Query

# Scan with complex query and field filtering
query = {
    "bool": {
        "must": [
            {"range": {"price": {"gte": 50, "lte": 1000}}},
            {"term": {"status": "active"}}
        ],
        "must_not": [
            {"term": {"category.keyword": "discontinued"}}
        ]
    }
}

for doc in scan(
    client,
    index="products",
    query=query,
    _source=["name", "price", "category"],  # Only retrieve specific fields
    scroll='5m',
    size=2000,
    preserve_order=True
):
    print(f"Product: {doc['_source']['name']}, Price: {doc['_source']['price']}")

Reindexing with Transformation

from elasticsearch.helpers import reindex

def transform_document(doc):
    """Transform documents during reindex."""
    source = doc['_source']
    
    # Add new computed field
    source['price_tier'] = 'high' if source['price'] > 1000 else 'low'
    
    # Rename field
    if 'desc' in source:
        source['description'] = source.pop('desc')
    
    # Convert price to integer cents
    source['price_cents'] = int(source['price'] * 100)
    
    return doc

# Reindex with transformation
success_count, failed_ops = reindex(
    client,
    source_index="products_v1",
    target_index="products_v2",
    query={"term": {"status": "active"}},  # Only reindex active products
    transform_doc_callback=transform_document,
    chunk_size=1000
)

print(f"Reindexed {success_count} documents")
if failed_ops:
    print(f"Failed operations: {len(failed_ops)}")

Parallel Bulk Processing

from elasticsearch.helpers import parallel_bulk
import threading

def generate_docs_from_database():
    """Generate documents from database query."""
    # Simulate database connection and query
    for i in range(50000):
        yield {
            "_index": "analytics",
            "_source": {
                "user_id": i % 1000,
                "event": f"action_{i % 10}",
                "timestamp": "2024-01-01T00:00:00Z",
                "value": i * 1.5
            }
        }

# Use parallel bulk for high-throughput indexing
results = []
for success, info in parallel_bulk(
    client,
    generate_docs_from_database(),
    chunk_size=2000,
    thread_count=8,  # Use 8 parallel threads
    queue_size=8,
    timeout='120s'
):
    results.append((success, info))
    if len(results) % 100 == 0:
        print(f"Processed {len(results)} chunks")

successful_chunks = sum(1 for success, _ in results if success)
print(f"Successfully processed {successful_chunks} chunks")

Async Helper Usage

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk, async_scan

async def async_bulk_example():
    """Example of async bulk operations."""
    async_client = AsyncElasticsearch(hosts=['http://localhost:9200'])
    
    # Prepare async documents
    async def async_generate_docs():
        for i in range(1000):
            yield {
                "_index": "async_test",
                "_id": i,
                "_source": {"value": i, "squared": i * i}
            }
    
    try:
        # Async bulk indexing
        success_count, failed_ops = await async_bulk(
            async_client,
            async_generate_docs(),
            chunk_size=100
        )
        
        print(f"Async indexed: {success_count} documents")
        
        # Async scanning
        total_scanned = 0
        async for doc in async_scan(
            async_client,
            index="async_test",
            query={"match_all": {}}
        ):
            total_scanned += 1
            
        print(f"Async scanned: {total_scanned} documents")
        
    finally:
        await async_client.close()

# Run async example
asyncio.run(async_bulk_example())

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch

docs

client-operations.md

cluster-management.md

esql-operations.md

exception-handling.md

helper-functions.md

index-management.md

index.md

inference-api.md

lifecycle-management.md

machine-learning.md

query-dsl.md

search-operations.md

security-operations.md

vectorstore-helpers.md

tile.json