Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities
—
Asynchronous client operations using Python's asyncio for high-performance applications requiring concurrent OpenSearch operations. The async client provides the same API as the synchronous client but with async/await support.
Main asynchronous client class with same methods as synchronous client.
class AsyncOpenSearch:
def __init__(self, hosts=None, **kwargs):
"""
Initialize async OpenSearch client.
Parameters: Same as OpenSearch client
"""
async def ping(self, **kwargs):
"""Test connection to the cluster (async)."""
async def info(self, **kwargs):
"""Get basic cluster information (async)."""
async def search(self, index=None, body=None, **kwargs):
"""Execute search query (async)."""
async def index(self, index, body, id=None, **kwargs):
"""Index a document (async)."""
async def get(self, index, id, **kwargs):
"""Retrieve document by ID (async)."""
async def update(self, index, id, body, **kwargs):
"""Update document (async)."""
async def delete(self, index, id, **kwargs):
"""Delete document (async)."""
async def bulk(self, body, index=None, **kwargs):
"""Bulk operations (async)."""
async def count(self, index=None, body=None, **kwargs):
"""Count documents (async)."""
async def scroll(self, scroll_id, scroll='5m', **kwargs):
"""Continue scrolling (async)."""
async def clear_scroll(self, scroll_id=None, **kwargs):
"""Clear scroll context (async)."""
async def close(self):
"""Close the client and cleanup resources."""Asynchronous versions of helper functions for bulk operations and scanning.
async def async_bulk(client, actions, **kwargs):
"""
Async bulk operations.
Parameters: Same as sync bulk() function
Returns:
Tuple of (success_count, failed_operations)
"""
async def async_streaming_bulk(client, actions, **kwargs):
"""
Async streaming bulk operations.
Parameters: Same as sync streaming_bulk() function
Async yields:
Tuples of (success, action_result) for each operation
"""
async def async_scan(client, query=None, scroll='5m', **kwargs):
"""
Async scan for large result sets.
Parameters: Same as sync scan() function
Async yields:
Individual document hits from search results
"""
async def async_reindex(client, source_index, target_index, **kwargs):
"""
Async reindexing operations.
Parameters: Same as sync reindex() function
Returns:
Tuple of (success_count, failed_operations)
"""import asyncio
from opensearchpy import AsyncOpenSearch
async def basic_async_example():
# Create async client
client = AsyncOpenSearch(
hosts=[{'host': 'localhost', 'port': 9200}],
use_ssl=True,
verify_certs=False
)
try:
# Test connection
response = await client.ping()
print(f"Connection successful: {response}")
# Get cluster info
info = await client.info()
print(f"Cluster: {info['cluster_name']}")
# Index a document
doc = {
'title': 'Async Document',
'content': 'This document was indexed asynchronously',
'timestamp': '2024-01-01T00:00:00Z'
}
result = await client.index(
index='async-index',
id='async-doc-1',
body=doc
)
print(f"Document indexed: {result['result']}")
# Search for documents
search_body = {
'query': {
'match': {
'title': 'Async'
}
}
}
search_result = await client.search(
index='async-index',
body=search_body
)
print(f"Found {search_result['hits']['total']['value']} documents")
finally:
# Always close the client
await client.close()
# Run async example
asyncio.run(basic_async_example())import asyncio
from opensearchpy import AsyncOpenSearch
async def concurrent_operations_example():
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
try:
# Create multiple coroutines
tasks = []
# Index multiple documents concurrently
for i in range(10):
doc = {
'id': i,
'title': f'Concurrent Document {i}',
'content': f'Content for document {i}'
}
task = client.index(
index='concurrent-index',
id=str(i),
body=doc
)
tasks.append(task)
# Execute all indexing operations concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = 0
failed = 0
for result in results:
if isinstance(result, Exception):
failed += 1
print(f"Failed: {result}")
else:
successful += 1
print(f"Concurrent indexing: {successful} successful, {failed} failed")
finally:
await client.close()
asyncio.run(concurrent_operations_example())import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk
async def async_bulk_example():
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
try:
# Prepare bulk actions
actions = [
{
'_op_type': 'index',
'_index': 'bulk-async-index',
'_id': str(i),
'_source': {
'title': f'Bulk Document {i}',
'value': i * 10,
'category': 'async-bulk'
}
}
for i in range(1000)
]
# Execute async bulk operations
success_count, failed_ops = await async_bulk(
client,
actions,
chunk_size=100,
max_retries=3
)
print(f"Bulk operation: {success_count} successful")
if failed_ops:
print(f"Failed operations: {len(failed_ops)}")
finally:
await client.close()
asyncio.run(async_bulk_example())import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_streaming_bulk
async def async_streaming_bulk_example():
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
async def generate_docs():
"""Async generator for documents."""
for i in range(500):
yield {
'_op_type': 'index',
'_index': 'streaming-async-index',
'_id': str(i),
'_source': {
'title': f'Streaming Document {i}',
'timestamp': '2024-01-01T00:00:00Z',
'value': i
}
}
# Simulate async data processing
if i % 50 == 0:
await asyncio.sleep(0.1)
try:
processed = 0
errors = []
# Stream bulk operations
async for success, info in async_streaming_bulk(
client,
generate_docs(),
chunk_size=50
):
if success:
processed += 1
else:
errors.append(info)
if processed % 100 == 0:
print(f"Processed: {processed}, Errors: {len(errors)}")
print(f"Streaming bulk completed: {processed} processed, {len(errors)} errors")
finally:
await client.close()
asyncio.run(async_streaming_bulk_example())import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_scan
async def async_scan_example():
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
try:
query = {
'query': {
'range': {
'value': {
'gte': 0,
'lt': 1000
}
}
},
'sort': ['_doc']
}
processed_count = 0
# Async scan through large result set
async for doc in async_scan(
client,
query=query,
index='large-async-index',
size=100,
scroll='5m'
):
# Process each document
processed_count += 1
# Log progress
if processed_count % 1000 == 0:
print(f"Scanned {processed_count} documents")
print(f"Async scan completed: {processed_count} documents processed")
finally:
await client.close()
asyncio.run(async_scan_example())import asyncio
from opensearchpy import AsyncOpenSearch
class AsyncOpenSearchManager:
def __init__(self, **kwargs):
self.client_kwargs = kwargs
self.client = None
async def __aenter__(self):
self.client = AsyncOpenSearch(**self.client_kwargs)
return self.client
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
await self.client.close()
async def context_manager_example():
# Use async context manager for automatic cleanup
async with AsyncOpenSearchManager(
hosts=[{'host': 'localhost', 'port': 9200}]
) as client:
# Perform operations
info = await client.info()
print(f"Connected to: {info['cluster_name']}")
# Index some documents
tasks = []
for i in range(5):
task = client.index(
index='context-manager-index',
id=str(i),
body={'value': i, 'title': f'Document {i}'}
)
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"Indexed {len(results)} documents")
# Search
search_result = await client.search(
index='context-manager-index',
body={'query': {'match_all': {}}}
)
print(f"Found {search_result['hits']['total']['value']} documents")
# Client is automatically closed here
print("Client closed automatically")
asyncio.run(context_manager_example())import asyncio
from opensearchpy import AsyncOpenSearch
async def connection_pool_example():
# Client with connection pool for high concurrency
client = AsyncOpenSearch(
hosts=[
{'host': 'node1.cluster.com', 'port': 9200},
{'host': 'node2.cluster.com', 'port': 9200},
{'host': 'node3.cluster.com', 'port': 9200}
],
# Connection pool settings
maxsize=20, # Maximum connections per host
# Health checking
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60,
# Retry settings
max_retries=3,
retry_on_timeout=True
)
try:
# Create many concurrent tasks
tasks = []
for i in range(100):
# Mix of different operations
if i % 3 == 0:
task = client.index(
index='pool-index',
id=str(i),
body={'value': i}
)
elif i % 3 == 1:
task = client.search(
index='pool-index',
body={'query': {'match_all': {}}}
)
else:
task = client.count(index='pool-index')
tasks.append(task)
# Execute all tasks concurrently
start_time = asyncio.get_event_loop().time()
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = asyncio.get_event_loop().time()
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = len(results) - successful
print(f"Concurrent operations with pool:")
print(f" Total: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
print(f" Time: {end_time - start_time:.2f} seconds")
finally:
await client.close()
asyncio.run(connection_pool_example())import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.exceptions import (
NotFoundError,
RequestError,
ConnectionError,
TransportError
)
async def error_handling_example():
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
try:
# Example of handling different types of errors
tasks = []
# This will succeed
tasks.append(client.index(
index='error-test',
id='success',
body={'status': 'success'}
))
# This might fail with validation error
tasks.append(client.index(
index='error-test',
id='malformed',
body={'date_field': 'invalid-date-format'}
))
# Try to get non-existent document
async def get_nonexistent():
try:
return await client.get(index='error-test', id='nonexistent')
except NotFoundError:
return {'error': 'Document not found'}
tasks.append(get_nonexistent())
# Execute with error handling
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {type(result).__name__}: {result}")
else:
print(f"Task {i} succeeded")
except ConnectionError as e:
print(f"Connection failed: {e}")
except TransportError as e:
print(f"Transport error: {e}")
finally:
await client.close()
asyncio.run(error_handling_example())import asyncio
import time
from opensearchpy import OpenSearch, AsyncOpenSearch
async def performance_comparison():
# Synchronous client
sync_client = OpenSearch([{'host': 'localhost', 'port': 9200}])
# Asynchronous client
async_client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
num_operations = 50
try:
# Synchronous operations
print("Running synchronous operations...")
sync_start = time.time()
for i in range(num_operations):
sync_client.index(
index='perf-test',
id=f'sync-{i}',
body={'value': i, 'type': 'sync'}
)
sync_end = time.time()
sync_duration = sync_end - sync_start
# Asynchronous operations
print("Running asynchronous operations...")
async_start = time.time()
tasks = [
async_client.index(
index='perf-test',
id=f'async-{i}',
body={'value': i, 'type': 'async'}
)
for i in range(num_operations)
]
await asyncio.gather(*tasks)
async_end = time.time()
async_duration = async_end - async_start
# Results
print(f"\nPerformance comparison ({num_operations} operations):")
print(f" Synchronous: {sync_duration:.2f} seconds")
print(f" Asynchronous: {async_duration:.2f} seconds")
print(f" Speedup: {sync_duration / async_duration:.2f}x")
finally:
await async_client.close()
asyncio.run(performance_comparison())Install with Tessl CLI
npx tessl i tessl/pypi-opensearch-py