CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opensearch-py

Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities

Pending
Overview
Eval results
Files

helper-functions.mddocs/

Helper Functions

High-level utility functions for common operations like bulk indexing, scanning large result sets, and data reindexing with built-in error handling and performance optimizations. These functions provide simplified interfaces for complex operations.

Capabilities

Bulk Operations

Efficient bulk processing for high-throughput document operations.

def bulk(client, actions, index=None, doc_type=None, **kwargs):
    """
    Perform bulk indexing, updating, and deleting operations.
    
    Parameters:
    - client: OpenSearch client instance
    - actions: Iterable of action dictionaries or generator
    - index (str, optional): Default index for actions without _index
    - doc_type (str, optional): Default document type (deprecated)
    - chunk_size (int, optional): Number of docs per chunk (default: 500)
    - max_chunk_bytes (int, optional): Maximum size per chunk in bytes
    - thread_count (int, optional): Number of parallel threads (default: 4)
    - queue_size (int, optional): Size of the task queue (default: 4)
    - refresh (str/bool, optional): Refresh policy for operations
    - timeout (str, optional): Request timeout
    - max_retries (int, optional): Maximum number of retries (default: 0)
    - initial_backoff (int, optional): Initial backoff time in seconds (default: 2)
    - max_backoff (int, optional): Maximum backoff time in seconds (default: 600)
    - yield_ok (bool, optional): Yield successful operations (default: True)
    
    Action format:
    {
        '_op_type': 'index',  # 'index', 'create', 'update', 'delete'
        '_index': 'my-index',
        '_id': 'doc-id',
        '_source': {'field': 'value'}  # For index/create/update
    }
    
    Returns:
    Iterator yielding tuples of (success_count, failed_actions)
    
    Raises:
    BulkIndexError: If there are failed operations and errors are not ignored
    """

def async_bulk(client, actions, **kwargs):
    """
    Asynchronous version of bulk operations.
    
    Parameters: Same as bulk() function
    
    Returns:
    Async iterator yielding operation results
    """

def streaming_bulk(client, actions, **kwargs):
    """
    Streaming bulk operations that yield results as they complete.
    
    Parameters: Same as bulk() function
    
    Yields:
    Tuples of (success, action_result) for each operation
    """

def async_streaming_bulk(client, actions, **kwargs):
    """
    Asynchronous streaming bulk operations.
    
    Parameters: Same as bulk() function
    
    Async yields:
    Tuples of (success, action_result) for each operation
    """

def parallel_bulk(client, actions, thread_count=4, **kwargs):
    """
    Parallel bulk operations using threading for improved performance.
    
    Parameters:
    - client: OpenSearch client instance
    - actions: Iterable of action dictionaries
    - thread_count (int): Number of parallel threads (default: 4)
    - Other parameters same as bulk() function
    
    Yields:
    Tuples of (success, action_result) for each operation
    """

Scanning Operations

Efficient scanning for processing large result sets.

def scan(client, query=None, scroll='5m', **kwargs):
    """
    Scan search results for large datasets using scroll API.
    
    Parameters:
    - client: OpenSearch client instance
    - query (dict, optional): Search query body
    - scroll (str, optional): Scroll timeout (default: '5m')
    - index (str/list, optional): Index name(s)
    - doc_type (str/list, optional): Document type(s) (deprecated)
    - size (int, optional): Number of results per shard (default: 1000)
    - request_timeout (float, optional): Request timeout in seconds
    - clear_scroll (bool, optional): Clear scroll context on completion (default: True)
    - scroll_kwargs (dict, optional): Additional scroll parameters
    - preserve_order (bool, optional): Preserve result order (default: False)
    
    Query format:
    {
        'query': {
            'match_all': {}
        },
        'sort': ['_doc']  # Recommended for performance
    }
    
    Yields:
    Individual document hits from search results
    
    Raises:
    ScanError: If scan operation fails
    """

def async_scan(client, query=None, scroll='5m', **kwargs):
    """
    Asynchronous version of scan operations.
    
    Parameters: Same as scan() function
    
    Async yields:
    Individual document hits from search results
    """

Reindexing Operations

Copy documents between indices with optional transformations.

def reindex(client, source_index, target_index, query=None, **kwargs):
    """
    Reindex documents from source to target index.
    
    Parameters:
    - client: OpenSearch client instance
    - source_index (str): Source index name
    - target_index (str): Target index name
    - query (dict, optional): Query to filter source documents
    - chunk_size (int, optional): Bulk operation chunk size (default: 500)
    - scroll (str, optional): Scroll timeout (default: '5m')
    - op_type (str, optional): Operation type ('index' or 'create', default: 'index')
    - transform (callable, optional): Function to transform documents
    - target_client: Different client for target index
    
    Transform function signature:
    def transform_doc(doc):
        # Modify doc['_source'], doc['_id'], etc.
        return doc
    
    Returns:
    Tuple of (success_count, failed_operations)
    
    Raises:
    ReindexError: If reindexing fails
    """

def async_reindex(client, source_index, target_index, **kwargs):
    """
    Asynchronous version of reindex operations.
    
    Parameters: Same as reindex() function
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

Utility Functions

Additional helper functions for common operations.

def expand_action(data):
    """
    Expand a single document into a bulk action format.
    
    Parameters:
    - data: Document data or action dictionary
    
    Returns:
    Properly formatted bulk action
    """

def _chunk_actions(actions, chunk_size, max_chunk_bytes):
    """
    Internal function to chunk actions for bulk operations.
    
    Parameters:
    - actions: Iterable of actions
    - chunk_size: Maximum actions per chunk
    - max_chunk_bytes: Maximum bytes per chunk
    
    Yields:
    Chunks of actions
    """

def _process_bulk_chunk(client, chunk, **kwargs):
    """
    Internal function to process a single bulk chunk.
    
    Parameters:
    - client: OpenSearch client instance
    - chunk: List of actions to process
    - kwargs: Additional bulk parameters
    
    Returns:
    Processed results
    """

Usage Examples

Basic Bulk Operations

from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk

client = OpenSearch([{'host': 'localhost', 'port': 9200}])

# Prepare bulk actions
actions = [
    {
        '_op_type': 'index',
        '_index': 'products',
        '_id': '1',
        '_source': {
            'title': 'Laptop Computer',
            'price': 999.99,
            'category': 'Electronics'
        }
    },
    {
        '_op_type': 'index',
        '_index': 'products',
        '_id': '2',
        '_source': {
            'title': 'Wireless Mouse',
            'price': 29.99,
            'category': 'Electronics'
        }
    },
    {
        '_op_type': 'update',
        '_index': 'products',
        '_id': '1',
        '_source': {
            'doc': {
                'in_stock': True
            }
        }
    },
    {
        '_op_type': 'delete',
        '_index': 'products',
        '_id': '3'
    }
]

# Execute bulk operations
successes, failures = bulk(
    client,
    actions,
    chunk_size=100,
    thread_count=4,
    timeout='60s'
)

print(f"Successful operations: {successes}")
if failures:
    print(f"Failed operations: {len(failures)}")
    for failure in failures:
        print(f"  Error: {failure}")

Streaming Bulk with Generator

from opensearchpy.helpers import streaming_bulk

def generate_docs():
    """Generate documents from data source."""
    for i in range(10000):
        yield {
            '_op_type': 'index',
            '_index': 'large-dataset',
            '_id': str(i),
            '_source': {
                'id': i,
                'value': f'Document {i}',
                'timestamp': '2024-01-01T00:00:00Z'
            }
        }

# Stream bulk operations
for success, info in streaming_bulk(
    client,
    generate_docs(),
    chunk_size=500,
    max_retries=3
):
    if not success:
        print(f"Failed to index: {info}")
    else:
        print(f"Indexed document: {info['index']['_id']}")

Large Dataset Scanning

from opensearchpy.helpers import scan

# Scan all documents in an index
query = {
    'query': {
        'range': {
            'timestamp': {
                'gte': '2024-01-01',
                'lte': '2024-12-31'
            }
        }
    },
    'sort': ['_doc']  # More efficient than default scoring
}

total_processed = 0
for doc in scan(
    client,
    query=query,
    index='large-index',
    size=1000,  # Documents per shard per request
    scroll='10m'
):
    # Process each document
    process_document(doc['_source'])
    total_processed += 1
    
    if total_processed % 10000 == 0:
        print(f"Processed {total_processed} documents")

print(f"Total processed: {total_processed} documents")

Reindexing with Transformation

from opensearchpy.helpers import reindex

def transform_document(doc):
    """Transform document during reindexing."""
    # Add new fields
    doc['_source']['processed_at'] = '2024-01-01T00:00:00Z'
    
    # Rename fields
    if 'old_field' in doc['_source']:
        doc['_source']['new_field'] = doc['_source'].pop('old_field')
    
    # Filter out unwanted fields
    doc['_source'].pop('temp_field', None)
    
    # Change document ID format
    doc['_id'] = f"new_{doc['_id']}"
    
    return doc

# Reindex with transformation
query = {
    'query': {
        'bool': {
            'must': [
                {'term': {'status': 'active'}}
            ]
        }
    }
}

success_count, failed_ops = reindex(
    client,
    source_index='old-index',
    target_index='new-index',
    query=query,
    transform=transform_document,
    chunk_size=200
)

print(f"Successfully reindexed: {success_count} documents")
if failed_ops:
    print(f"Failed operations: {len(failed_ops)}")

Parallel Bulk Processing

from opensearchpy.helpers import parallel_bulk
import json

def read_json_file(filename):
    """Read documents from JSON file."""
    with open(filename, 'r') as f:
        for line in f:
            doc = json.loads(line)
            yield {
                '_op_type': 'index',
                '_index': 'imported-data',
                '_source': doc
            }

# Process large file with parallel bulk
processed = 0
errors = []

for success, info in parallel_bulk(
    client,
    read_json_file('large_dataset.jsonl'),
    thread_count=8,
    chunk_size=1000,
    max_retries=3,
    initial_backoff=2,
    max_backoff=600
):
    if success:
        processed += 1
    else:
        errors.append(info)
    
    if processed % 10000 == 0:
        print(f"Processed: {processed}, Errors: {len(errors)}")

print(f"Final: Processed {processed}, Errors: {len(errors)}")

Async Bulk Operations

import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk

async def async_bulk_example():
    client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    actions = [
        {
            '_op_type': 'index',
            '_index': 'async-index',
            '_id': str(i),
            '_source': {'value': i}
        }
        for i in range(1000)
    ]
    
    # Async bulk operations
    success_count, failed_ops = await async_bulk(
        client,
        actions,
        chunk_size=100
    )
    
    print(f"Async bulk: {success_count} successful, {len(failed_ops)} failed")
    
    await client.close()

# Run async example
asyncio.run(async_bulk_example())

Error Handling and Retry Logic

from opensearchpy.helpers import bulk
from opensearchpy.exceptions import BulkIndexError, ConnectionError

def robust_bulk_index(client, documents, max_attempts=3):
    """Robust bulk indexing with retry logic."""
    actions = [
        {
            '_op_type': 'index',
            '_index': 'robust-index',
            '_source': doc
        }
        for doc in documents
    ]
    
    for attempt in range(max_attempts):
        try:
            success_count, failed_ops = bulk(
                client,
                actions,
                max_retries=2,
                initial_backoff=2,
                max_backoff=60
            )
            
            if not failed_ops:
                print(f"All {success_count} documents indexed successfully")
                return success_count, []
            
            # Retry only failed operations
            actions = failed_ops
            print(f"Attempt {attempt + 1}: {len(failed_ops)} operations failed, retrying...")
            
        except (BulkIndexError, ConnectionError) as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_attempts - 1:
                raise
            
            # Wait before retry
            import time
            time.sleep(2 ** attempt)
    
    return success_count, failed_ops

# Use robust bulk indexing
documents = [{'id': i, 'data': f'value_{i}'} for i in range(1000)]
success, failures = robust_bulk_index(client, documents)

Install with Tessl CLI

npx tessl i tessl/pypi-opensearch-py

docs

async-operations.md

authentication.md

core-client.md

document-modeling.md

dsl-queries.md

helper-functions.md

index.md

namespaced-apis.md

plugin-apis.md

tile.json