CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opensearch-py

Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities

Pending
Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Asynchronous client operations using Python's asyncio for high-performance applications requiring concurrent OpenSearch operations. The async client provides the same API as the synchronous client but with async/await support.

Capabilities

Async Client

Main asynchronous client class with same methods as synchronous client.

class AsyncOpenSearch:
    def __init__(self, hosts=None, **kwargs):
        """
        Initialize async OpenSearch client.
        
        Parameters: Same as OpenSearch client
        """
    
    async def ping(self, **kwargs):
        """Test connection to the cluster (async)."""
    
    async def info(self, **kwargs):
        """Get basic cluster information (async)."""
    
    async def search(self, index=None, body=None, **kwargs):
        """Execute search query (async)."""
    
    async def index(self, index, body, id=None, **kwargs):
        """Index a document (async)."""
    
    async def get(self, index, id, **kwargs):
        """Retrieve document by ID (async)."""
    
    async def update(self, index, id, body, **kwargs):
        """Update document (async)."""
    
    async def delete(self, index, id, **kwargs):
        """Delete document (async)."""
    
    async def bulk(self, body, index=None, **kwargs):
        """Bulk operations (async)."""
    
    async def count(self, index=None, body=None, **kwargs):
        """Count documents (async)."""
    
    async def scroll(self, scroll_id, scroll='5m', **kwargs):
        """Continue scrolling (async)."""
    
    async def clear_scroll(self, scroll_id=None, **kwargs):
        """Clear scroll context (async)."""
    
    async def close(self):
        """Close the client and cleanup resources."""

Async Helper Functions

Asynchronous versions of helper functions for bulk operations and scanning.

async def async_bulk(client, actions, **kwargs):
    """
    Async bulk operations.
    
    Parameters: Same as sync bulk() function
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

async def async_streaming_bulk(client, actions, **kwargs):
    """
    Async streaming bulk operations.
    
    Parameters: Same as sync streaming_bulk() function
    
    Async yields:
    Tuples of (success, action_result) for each operation
    """

async def async_scan(client, query=None, scroll='5m', **kwargs):
    """
    Async scan for large result sets.
    
    Parameters: Same as sync scan() function
    
    Async yields:
    Individual document hits from search results
    """

async def async_reindex(client, source_index, target_index, **kwargs):
    """
    Async reindexing operations.
    
    Parameters: Same as sync reindex() function
    
    Returns:
    Tuple of (success_count, failed_operations)
    """

Usage Examples

Basic Async Client Usage

import asyncio
from opensearchpy import AsyncOpenSearch

async def basic_async_example():
    # Create async client
    client = AsyncOpenSearch(
        hosts=[{'host': 'localhost', 'port': 9200}],
        use_ssl=True,
        verify_certs=False
    )
    
    try:
        # Test connection
        response = await client.ping()
        print(f"Connection successful: {response}")
        
        # Get cluster info
        info = await client.info()
        print(f"Cluster: {info['cluster_name']}")
        
        # Index a document
        doc = {
            'title': 'Async Document',
            'content': 'This document was indexed asynchronously',
            'timestamp': '2024-01-01T00:00:00Z'
        }
        
        result = await client.index(
            index='async-index',
            id='async-doc-1',
            body=doc
        )
        print(f"Document indexed: {result['result']}")
        
        # Search for documents
        search_body = {
            'query': {
                'match': {
                    'title': 'Async'
                }
            }
        }
        
        search_result = await client.search(
            index='async-index',
            body=search_body
        )
        
        print(f"Found {search_result['hits']['total']['value']} documents")
        
    finally:
        # Always close the client
        await client.close()

# Run async example
asyncio.run(basic_async_example())

Concurrent Operations

import asyncio
from opensearchpy import AsyncOpenSearch

async def concurrent_operations_example():
    client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    try:
        # Create multiple coroutines
        tasks = []
        
        # Index multiple documents concurrently
        for i in range(10):
            doc = {
                'id': i,
                'title': f'Concurrent Document {i}',
                'content': f'Content for document {i}'
            }
            
            task = client.index(
                index='concurrent-index',
                id=str(i),
                body=doc
            )
            tasks.append(task)
        
        # Execute all indexing operations concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        successful = 0
        failed = 0
        
        for result in results:
            if isinstance(result, Exception):
                failed += 1
                print(f"Failed: {result}")
            else:
                successful += 1
        
        print(f"Concurrent indexing: {successful} successful, {failed} failed")
        
    finally:
        await client.close()

asyncio.run(concurrent_operations_example())

Async Bulk Operations

import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk

async def async_bulk_example():
    client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    try:
        # Prepare bulk actions
        actions = [
            {
                '_op_type': 'index',
                '_index': 'bulk-async-index',
                '_id': str(i),
                '_source': {
                    'title': f'Bulk Document {i}',
                    'value': i * 10,
                    'category': 'async-bulk'
                }
            }
            for i in range(1000)
        ]
        
        # Execute async bulk operations
        success_count, failed_ops = await async_bulk(
            client,
            actions,
            chunk_size=100,
            max_retries=3
        )
        
        print(f"Bulk operation: {success_count} successful")
        if failed_ops:
            print(f"Failed operations: {len(failed_ops)}")
    
    finally:
        await client.close()

asyncio.run(async_bulk_example())

Async Streaming Bulk

import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_streaming_bulk

async def async_streaming_bulk_example():
    client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    async def generate_docs():
        """Async generator for documents."""
        for i in range(500):
            yield {
                '_op_type': 'index',
                '_index': 'streaming-async-index',
                '_id': str(i),
                '_source': {
                    'title': f'Streaming Document {i}',
                    'timestamp': '2024-01-01T00:00:00Z',
                    'value': i
                }
            }
            
            # Simulate async data processing
            if i % 50 == 0:
                await asyncio.sleep(0.1)
    
    try:
        processed = 0
        errors = []
        
        # Stream bulk operations
        async for success, info in async_streaming_bulk(
            client,
            generate_docs(),
            chunk_size=50
        ):
            if success:
                processed += 1
            else:
                errors.append(info)
            
            if processed % 100 == 0:
                print(f"Processed: {processed}, Errors: {len(errors)}")
        
        print(f"Streaming bulk completed: {processed} processed, {len(errors)} errors")
    
    finally:
        await client.close()

asyncio.run(async_streaming_bulk_example())

Async Scanning

import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_scan

async def async_scan_example():
    client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    try:
        query = {
            'query': {
                'range': {
                    'value': {
                        'gte': 0,
                        'lt': 1000
                    }
                }
            },
            'sort': ['_doc']
        }
        
        processed_count = 0
        
        # Async scan through large result set
        async for doc in async_scan(
            client,
            query=query,
            index='large-async-index',
            size=100,
            scroll='5m'
        ):
            # Process each document
            processed_count += 1
            
            # Log progress
            if processed_count % 1000 == 0:
                print(f"Scanned {processed_count} documents")
        
        print(f"Async scan completed: {processed_count} documents processed")
    
    finally:
        await client.close()

asyncio.run(async_scan_example())

Async Context Manager

import asyncio
from opensearchpy import AsyncOpenSearch

class AsyncOpenSearchManager:
    def __init__(self, **kwargs):
        self.client_kwargs = kwargs
        self.client = None
    
    async def __aenter__(self):
        self.client = AsyncOpenSearch(**self.client_kwargs)
        return self.client
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.close()

async def context_manager_example():
    # Use async context manager for automatic cleanup
    async with AsyncOpenSearchManager(
        hosts=[{'host': 'localhost', 'port': 9200}]
    ) as client:
        
        # Perform operations
        info = await client.info()
        print(f"Connected to: {info['cluster_name']}")
        
        # Index some documents
        tasks = []
        for i in range(5):
            task = client.index(
                index='context-manager-index',
                id=str(i),
                body={'value': i, 'title': f'Document {i}'}
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        print(f"Indexed {len(results)} documents")
        
        # Search
        search_result = await client.search(
            index='context-manager-index',
            body={'query': {'match_all': {}}}
        )
        
        print(f"Found {search_result['hits']['total']['value']} documents")
    
    # Client is automatically closed here
    print("Client closed automatically")

asyncio.run(context_manager_example())

Async with Connection Pool

import asyncio
from opensearchpy import AsyncOpenSearch

async def connection_pool_example():
    # Client with connection pool for high concurrency
    client = AsyncOpenSearch(
        hosts=[
            {'host': 'node1.cluster.com', 'port': 9200},
            {'host': 'node2.cluster.com', 'port': 9200},
            {'host': 'node3.cluster.com', 'port': 9200}
        ],
        # Connection pool settings
        maxsize=20,  # Maximum connections per host
        # Health checking
        sniff_on_start=True,
        sniff_on_connection_fail=True,
        sniffer_timeout=60,
        # Retry settings
        max_retries=3,
        retry_on_timeout=True
    )
    
    try:
        # Create many concurrent tasks
        tasks = []
        
        for i in range(100):
            # Mix of different operations
            if i % 3 == 0:
                task = client.index(
                    index='pool-index',
                    id=str(i),
                    body={'value': i}
                )
            elif i % 3 == 1:
                task = client.search(
                    index='pool-index',
                    body={'query': {'match_all': {}}}
                )
            else:
                task = client.count(index='pool-index')
            
            tasks.append(task)
        
        # Execute all tasks concurrently
        start_time = asyncio.get_event_loop().time()
        results = await asyncio.gather(*tasks, return_exceptions=True)
        end_time = asyncio.get_event_loop().time()
        
        successful = sum(1 for r in results if not isinstance(r, Exception))
        failed = len(results) - successful
        
        print(f"Concurrent operations with pool:")
        print(f"  Total: {len(results)}")
        print(f"  Successful: {successful}")
        print(f"  Failed: {failed}")
        print(f"  Time: {end_time - start_time:.2f} seconds")
    
    finally:
        await client.close()

asyncio.run(connection_pool_example())

Async Error Handling

import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.exceptions import (
    NotFoundError, 
    RequestError, 
    ConnectionError,
    TransportError
)

async def error_handling_example():
    client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    try:
        # Example of handling different types of errors
        tasks = []
        
        # This will succeed
        tasks.append(client.index(
            index='error-test',
            id='success',
            body={'status': 'success'}
        ))
        
        # This might fail with validation error
        tasks.append(client.index(
            index='error-test',
            id='malformed',
            body={'date_field': 'invalid-date-format'}
        ))
        
        # Try to get non-existent document
        async def get_nonexistent():
            try:
                return await client.get(index='error-test', id='nonexistent')
            except NotFoundError:
                return {'error': 'Document not found'}
        
        tasks.append(get_nonexistent())
        
        # Execute with error handling
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Task {i} failed: {type(result).__name__}: {result}")
            else:
                print(f"Task {i} succeeded")
    
    except ConnectionError as e:
        print(f"Connection failed: {e}")
    
    except TransportError as e:
        print(f"Transport error: {e}")
    
    finally:
        await client.close()

asyncio.run(error_handling_example())

Performance Comparison

import asyncio
import time
from opensearchpy import OpenSearch, AsyncOpenSearch

async def performance_comparison():
    # Synchronous client
    sync_client = OpenSearch([{'host': 'localhost', 'port': 9200}])
    
    # Asynchronous client
    async_client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
    
    num_operations = 50
    
    try:
        # Synchronous operations
        print("Running synchronous operations...")
        sync_start = time.time()
        
        for i in range(num_operations):
            sync_client.index(
                index='perf-test',
                id=f'sync-{i}',
                body={'value': i, 'type': 'sync'}
            )
        
        sync_end = time.time()
        sync_duration = sync_end - sync_start
        
        # Asynchronous operations
        print("Running asynchronous operations...")
        async_start = time.time()
        
        tasks = [
            async_client.index(
                index='perf-test',
                id=f'async-{i}',
                body={'value': i, 'type': 'async'}
            )
            for i in range(num_operations)
        ]
        
        await asyncio.gather(*tasks)
        
        async_end = time.time()
        async_duration = async_end - async_start
        
        # Results
        print(f"\nPerformance comparison ({num_operations} operations):")
        print(f"  Synchronous: {sync_duration:.2f} seconds")
        print(f"  Asynchronous: {async_duration:.2f} seconds")
        print(f"  Speedup: {sync_duration / async_duration:.2f}x")
        
    finally:
        await async_client.close()

asyncio.run(performance_comparison())

Install with Tessl CLI

npx tessl i tessl/pypi-opensearch-py

docs

async-operations.md

authentication.md

core-client.md

document-modeling.md

dsl-queries.md

helper-functions.md

index.md

namespaced-apis.md

plugin-apis.md

tile.json