Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities
—
High-level utility functions for common operations like bulk indexing, scanning large result sets, and data reindexing with built-in error handling and performance optimizations. These functions provide simplified interfaces for complex operations.
Efficient bulk processing for high-throughput document operations.
def bulk(client, actions, index=None, doc_type=None, **kwargs):
"""
Perform bulk indexing, updating, and deleting operations.
Parameters:
- client: OpenSearch client instance
- actions: Iterable of action dictionaries or generator
- index (str, optional): Default index for actions without _index
- doc_type (str, optional): Default document type (deprecated)
- chunk_size (int, optional): Number of docs per chunk (default: 500)
- max_chunk_bytes (int, optional): Maximum size per chunk in bytes
- thread_count (int, optional): Number of parallel threads (default: 4)
- queue_size (int, optional): Size of the task queue (default: 4)
- refresh (str/bool, optional): Refresh policy for operations
- timeout (str, optional): Request timeout
- max_retries (int, optional): Maximum number of retries (default: 0)
- initial_backoff (int, optional): Initial backoff time in seconds (default: 2)
- max_backoff (int, optional): Maximum backoff time in seconds (default: 600)
- yield_ok (bool, optional): Yield successful operations (default: True)
Action format:
{
'_op_type': 'index', # 'index', 'create', 'update', 'delete'
'_index': 'my-index',
'_id': 'doc-id',
'_source': {'field': 'value'} # For index/create/update
}
Returns:
Iterator yielding tuples of (success_count, failed_actions)
Raises:
BulkIndexError: If there are failed operations and errors are not ignored
"""
def async_bulk(client, actions, **kwargs):
"""
Asynchronous version of bulk operations.
Parameters: Same as bulk() function
Returns:
Async iterator yielding operation results
"""
def streaming_bulk(client, actions, **kwargs):
"""
Streaming bulk operations that yield results as they complete.
Parameters: Same as bulk() function
Yields:
Tuples of (success, action_result) for each operation
"""
def async_streaming_bulk(client, actions, **kwargs):
"""
Asynchronous streaming bulk operations.
Parameters: Same as bulk() function
Async yields:
Tuples of (success, action_result) for each operation
"""
def parallel_bulk(client, actions, thread_count=4, **kwargs):
"""
Parallel bulk operations using threading for improved performance.
Parameters:
- client: OpenSearch client instance
- actions: Iterable of action dictionaries
- thread_count (int): Number of parallel threads (default: 4)
- Other parameters same as bulk() function
Yields:
Tuples of (success, action_result) for each operation
"""Efficient scanning for processing large result sets.
def scan(client, query=None, scroll='5m', **kwargs):
"""
Scan search results for large datasets using scroll API.
Parameters:
- client: OpenSearch client instance
- query (dict, optional): Search query body
- scroll (str, optional): Scroll timeout (default: '5m')
- index (str/list, optional): Index name(s)
- doc_type (str/list, optional): Document type(s) (deprecated)
- size (int, optional): Number of results per shard (default: 1000)
- request_timeout (float, optional): Request timeout in seconds
- clear_scroll (bool, optional): Clear scroll context on completion (default: True)
- scroll_kwargs (dict, optional): Additional scroll parameters
- preserve_order (bool, optional): Preserve result order (default: False)
Query format:
{
'query': {
'match_all': {}
},
'sort': ['_doc'] # Recommended for performance
}
Yields:
Individual document hits from search results
Raises:
ScanError: If scan operation fails
"""
def async_scan(client, query=None, scroll='5m', **kwargs):
"""
Asynchronous version of scan operations.
Parameters: Same as scan() function
Async yields:
Individual document hits from search results
"""Copy documents between indices with optional transformations.
def reindex(client, source_index, target_index, query=None, **kwargs):
"""
Reindex documents from source to target index.
Parameters:
- client: OpenSearch client instance
- source_index (str): Source index name
- target_index (str): Target index name
- query (dict, optional): Query to filter source documents
- chunk_size (int, optional): Bulk operation chunk size (default: 500)
- scroll (str, optional): Scroll timeout (default: '5m')
- op_type (str, optional): Operation type ('index' or 'create', default: 'index')
- transform (callable, optional): Function to transform documents
- target_client: Different client for target index
Transform function signature:
def transform_doc(doc):
# Modify doc['_source'], doc['_id'], etc.
return doc
Returns:
Tuple of (success_count, failed_operations)
Raises:
ReindexError: If reindexing fails
"""
def async_reindex(client, source_index, target_index, **kwargs):
"""
Asynchronous version of reindex operations.
Parameters: Same as reindex() function
Returns:
Tuple of (success_count, failed_operations)
"""Additional helper functions for common operations.
def expand_action(data):
"""
Expand a single document into a bulk action format.
Parameters:
- data: Document data or action dictionary
Returns:
Properly formatted bulk action
"""
def _chunk_actions(actions, chunk_size, max_chunk_bytes):
"""
Internal function to chunk actions for bulk operations.
Parameters:
- actions: Iterable of actions
- chunk_size: Maximum actions per chunk
- max_chunk_bytes: Maximum bytes per chunk
Yields:
Chunks of actions
"""
def _process_bulk_chunk(client, chunk, **kwargs):
"""
Internal function to process a single bulk chunk.
Parameters:
- client: OpenSearch client instance
- chunk: List of actions to process
- kwargs: Additional bulk parameters
Returns:
Processed results
"""from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
client = OpenSearch([{'host': 'localhost', 'port': 9200}])
# Prepare bulk actions
actions = [
{
'_op_type': 'index',
'_index': 'products',
'_id': '1',
'_source': {
'title': 'Laptop Computer',
'price': 999.99,
'category': 'Electronics'
}
},
{
'_op_type': 'index',
'_index': 'products',
'_id': '2',
'_source': {
'title': 'Wireless Mouse',
'price': 29.99,
'category': 'Electronics'
}
},
{
'_op_type': 'update',
'_index': 'products',
'_id': '1',
'_source': {
'doc': {
'in_stock': True
}
}
},
{
'_op_type': 'delete',
'_index': 'products',
'_id': '3'
}
]
# Execute bulk operations
successes, failures = bulk(
client,
actions,
chunk_size=100,
thread_count=4,
timeout='60s'
)
print(f"Successful operations: {successes}")
if failures:
print(f"Failed operations: {len(failures)}")
for failure in failures:
print(f" Error: {failure}")from opensearchpy.helpers import streaming_bulk
def generate_docs():
"""Generate documents from data source."""
for i in range(10000):
yield {
'_op_type': 'index',
'_index': 'large-dataset',
'_id': str(i),
'_source': {
'id': i,
'value': f'Document {i}',
'timestamp': '2024-01-01T00:00:00Z'
}
}
# Stream bulk operations
for success, info in streaming_bulk(
client,
generate_docs(),
chunk_size=500,
max_retries=3
):
if not success:
print(f"Failed to index: {info}")
else:
print(f"Indexed document: {info['index']['_id']}")from opensearchpy.helpers import scan
# Scan all documents in an index
query = {
'query': {
'range': {
'timestamp': {
'gte': '2024-01-01',
'lte': '2024-12-31'
}
}
},
'sort': ['_doc'] # More efficient than default scoring
}
total_processed = 0
for doc in scan(
client,
query=query,
index='large-index',
size=1000, # Documents per shard per request
scroll='10m'
):
# Process each document
process_document(doc['_source'])
total_processed += 1
if total_processed % 10000 == 0:
print(f"Processed {total_processed} documents")
print(f"Total processed: {total_processed} documents")from opensearchpy.helpers import reindex
def transform_document(doc):
"""Transform document during reindexing."""
# Add new fields
doc['_source']['processed_at'] = '2024-01-01T00:00:00Z'
# Rename fields
if 'old_field' in doc['_source']:
doc['_source']['new_field'] = doc['_source'].pop('old_field')
# Filter out unwanted fields
doc['_source'].pop('temp_field', None)
# Change document ID format
doc['_id'] = f"new_{doc['_id']}"
return doc
# Reindex with transformation
query = {
'query': {
'bool': {
'must': [
{'term': {'status': 'active'}}
]
}
}
}
success_count, failed_ops = reindex(
client,
source_index='old-index',
target_index='new-index',
query=query,
transform=transform_document,
chunk_size=200
)
print(f"Successfully reindexed: {success_count} documents")
if failed_ops:
print(f"Failed operations: {len(failed_ops)}")from opensearchpy.helpers import parallel_bulk
import json
def read_json_file(filename):
"""Read documents from JSON file."""
with open(filename, 'r') as f:
for line in f:
doc = json.loads(line)
yield {
'_op_type': 'index',
'_index': 'imported-data',
'_source': doc
}
# Process large file with parallel bulk
processed = 0
errors = []
for success, info in parallel_bulk(
client,
read_json_file('large_dataset.jsonl'),
thread_count=8,
chunk_size=1000,
max_retries=3,
initial_backoff=2,
max_backoff=600
):
if success:
processed += 1
else:
errors.append(info)
if processed % 10000 == 0:
print(f"Processed: {processed}, Errors: {len(errors)}")
print(f"Final: Processed {processed}, Errors: {len(errors)}")import asyncio
from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk
async def async_bulk_example():
client = AsyncOpenSearch([{'host': 'localhost', 'port': 9200}])
actions = [
{
'_op_type': 'index',
'_index': 'async-index',
'_id': str(i),
'_source': {'value': i}
}
for i in range(1000)
]
# Async bulk operations
success_count, failed_ops = await async_bulk(
client,
actions,
chunk_size=100
)
print(f"Async bulk: {success_count} successful, {len(failed_ops)} failed")
await client.close()
# Run async example
asyncio.run(async_bulk_example())from opensearchpy.helpers import bulk
from opensearchpy.exceptions import BulkIndexError, ConnectionError
def robust_bulk_index(client, documents, max_attempts=3):
"""Robust bulk indexing with retry logic."""
actions = [
{
'_op_type': 'index',
'_index': 'robust-index',
'_source': doc
}
for doc in documents
]
for attempt in range(max_attempts):
try:
success_count, failed_ops = bulk(
client,
actions,
max_retries=2,
initial_backoff=2,
max_backoff=60
)
if not failed_ops:
print(f"All {success_count} documents indexed successfully")
return success_count, []
# Retry only failed operations
actions = failed_ops
print(f"Attempt {attempt + 1}: {len(failed_ops)} operations failed, retrying...")
except (BulkIndexError, ConnectionError) as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_attempts - 1:
raise
# Wait before retry
import time
time.sleep(2 ** attempt)
return success_count, failed_ops
# Use robust bulk indexing
documents = [{'id': i, 'data': f'value_{i}'} for i in range(1000)]
success, failures = robust_bulk_index(client, documents)Install with Tessl CLI
npx tessl i tessl/pypi-opensearch-py