Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities
—
Essential OpenSearch operations for document management, searching, and basic cluster interactions. These are the fundamental methods available on both the synchronous OpenSearch and asynchronous AsyncOpenSearch client classes.
Test connectivity and retrieve basic cluster information.
def ping(self, **kwargs):
"""
Test connection to the cluster.
Parameters:
- request_timeout: Request timeout in seconds
Returns:
bool: True if connection is successful
Raises:
ConnectionError: If connection fails
"""
def info(self, **kwargs):
"""
Get basic information about the cluster.
Parameters:
- request_timeout: Request timeout in seconds
Returns:
dict: Cluster information including version, name, and tagline
"""Create and update documents in OpenSearch indices.
def index(self, index, body, id=None, **kwargs):
"""
Index a document.
Parameters:
- index (str): Target index name
- body (dict): Document source data
- id (str, optional): Document ID (auto-generated if not provided)
- doc_type (str, optional): Document type (deprecated in newer versions)
- refresh (str/bool, optional): Refresh policy ('true', 'false', 'wait_for')
- routing (str, optional): Routing value
- timeout (str, optional): Operation timeout
- version (int, optional): Expected document version
- version_type (str, optional): Version type ('internal', 'external')
- if_seq_no (int, optional): Only perform if sequence number matches
- if_primary_term (int, optional): Only perform if primary term matches
- pipeline (str, optional): Ingest pipeline to use
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Response containing '_index', '_id', '_version', 'result', etc.
Raises:
RequestError: If indexing fails due to client error
ConflictError: If version conflict occurs
"""
def create(self, index, id, body, **kwargs):
"""
Create a new document. Fails if document already exists.
Parameters:
- index (str): Target index name
- id (str): Document ID
- body (dict): Document source data
- refresh (str/bool, optional): Refresh policy
- routing (str, optional): Routing value
- timeout (str, optional): Operation timeout
- pipeline (str, optional): Ingest pipeline to use
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Response containing creation details
Raises:
ConflictError: If document already exists
"""Retrieve documents and check for document existence.
def get(self, index, id, **kwargs):
"""
Retrieve a document by ID.
Parameters:
- index (str): Index name
- id (str): Document ID
- doc_type (str, optional): Document type (deprecated)
- stored_fields (list, optional): List of stored fields to retrieve
- _source (bool/list/str, optional): Source filtering
- _source_excludes (list, optional): Source fields to exclude
- _source_includes (list, optional): Source fields to include
- preference (str, optional): Preference for which shard to query
- realtime (bool, optional): Whether to perform realtime get
- refresh (bool, optional): Refresh before get
- routing (str, optional): Routing value
- version (int, optional): Expected document version
- version_type (str, optional): Version type
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Document data including '_source', '_id', '_version', etc.
Raises:
NotFoundError: If document is not found
"""
def exists(self, index, id, **kwargs):
"""
Check if a document exists.
Parameters:
- index (str): Index name
- id (str): Document ID
- doc_type (str, optional): Document type (deprecated)
- preference (str, optional): Preference for which shard to query
- realtime (bool, optional): Whether to perform realtime check
- refresh (bool, optional): Refresh before check
- routing (str, optional): Routing value
- request_timeout (float, optional): Request timeout in seconds
Returns:
bool: True if document exists, False otherwise
"""
def get_source(self, index, id, **kwargs):
"""
Retrieve only the source of a document.
Parameters:
- index (str): Index name
- id (str): Document ID
- doc_type (str, optional): Document type (deprecated)
- _source_excludes (list, optional): Source fields to exclude
- _source_includes (list, optional): Source fields to include
- preference (str, optional): Preference for which shard to query
- realtime (bool, optional): Whether to perform realtime get
- refresh (bool, optional): Refresh before get
- routing (str, optional): Routing value
- version (int, optional): Expected document version
- version_type (str, optional): Version type
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Document source data only
Raises:
NotFoundError: If document is not found
"""Update existing documents with partial data or scripts.
def update(self, index, id, body, **kwargs):
"""
Update a document with partial data or script.
Parameters:
- index (str): Index name
- id (str): Document ID
- body (dict): Update body containing 'doc', 'script', or 'upsert'
- doc_type (str, optional): Document type (deprecated)
- _source (bool/list/str, optional): Source filtering for response
- _source_excludes (list, optional): Source fields to exclude from response
- _source_includes (list, optional): Source fields to include in response
- if_seq_no (int, optional): Only perform if sequence number matches
- if_primary_term (int, optional): Only perform if primary term matches
- lang (str, optional): Script language
- refresh (str/bool, optional): Refresh policy
- retry_on_conflict (int, optional): Number of times to retry on conflict
- routing (str, optional): Routing value
- timeout (str, optional): Operation timeout
- version (int, optional): Expected document version
- version_type (str, optional): Version type
- wait_for_active_shards (str/int, optional): Wait for active shards
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Update response containing result details
Raises:
NotFoundError: If document is not found
ConflictError: If version conflict occurs
"""
def update_by_query(self, index=None, body=None, **kwargs):
"""
Update documents matching a query.
Parameters:
- index (str/list, optional): Index name(s)
- body (dict, optional): Query and update specification
- doc_type (str/list, optional): Document type(s) (deprecated)
- analyzer (str, optional): Analyzer for query string
- analyze_wildcard (bool, optional): Whether to analyze wildcard terms
- conflicts (str, optional): What to do on conflicts ('abort' or 'proceed')
- default_operator (str, optional): Default operator for query string
- df (str, optional): Default field for query string
- expand_wildcards (str, optional): Wildcard expansion type
- from_ (int, optional): Starting document offset
- ignore_unavailable (bool, optional): Ignore unavailable indices
- allow_no_indices (bool, optional): Allow operation on no indices
- pipeline (str, optional): Ingest pipeline
- preference (str, optional): Preference for which shard to query
- q (str, optional): Query string
- refresh (bool, optional): Refresh after operation
- request_cache (bool, optional): Use request cache
- requests_per_second (float, optional): Throttling rate
- routing (str/list, optional): Routing value(s)
- scroll (str, optional): Scroll timeout
- scroll_size (int, optional): Scroll batch size
- search_type (str, optional): Search type
- search_timeout (str, optional): Search timeout
- size (int, optional): Number of documents to update
- sort (list, optional): Sort order
- terminate_after (int, optional): Maximum number of documents to process
- timeout (str, optional): Operation timeout
- version (bool, optional): Return document version
- version_type (bool, optional): Return version type
- wait_for_active_shards (str/int, optional): Wait for active shards
- wait_for_completion (bool, optional): Wait for operation completion
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Update results including updated count and conflicts
"""Delete documents individually or by query.
def delete(self, index, id, **kwargs):
"""
Delete a document by ID.
Parameters:
- index (str): Index name
- id (str): Document ID
- doc_type (str, optional): Document type (deprecated)
- if_seq_no (int, optional): Only perform if sequence number matches
- if_primary_term (int, optional): Only perform if primary term matches
- refresh (str/bool, optional): Refresh policy
- routing (str, optional): Routing value
- timeout (str, optional): Operation timeout
- version (int, optional): Expected document version
- version_type (str, optional): Version type
- wait_for_active_shards (str/int, optional): Wait for active shards
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Deletion response containing result details
Raises:
NotFoundError: If document is not found
"""
def delete_by_query(self, index, body=None, **kwargs):
"""
Delete documents matching a query.
Parameters:
- index (str/list): Index name(s)
- body (dict, optional): Query specification
- doc_type (str/list, optional): Document type(s) (deprecated)
- analyzer (str, optional): Analyzer for query string
- analyze_wildcard (bool, optional): Whether to analyze wildcard terms
- conflicts (str, optional): What to do on conflicts ('abort' or 'proceed')
- default_operator (str, optional): Default operator for query string
- df (str, optional): Default field for query string
- expand_wildcards (str, optional): Wildcard expansion type
- from_ (int, optional): Starting document offset
- ignore_unavailable (bool, optional): Ignore unavailable indices
- allow_no_indices (bool, optional): Allow operation on no indices
- preference (str, optional): Preference for which shard to query
- q (str, optional): Query string
- refresh (bool, optional): Refresh after operation
- request_cache (bool, optional): Use request cache
- requests_per_second (float, optional): Throttling rate
- routing (str/list, optional): Routing value(s)
- scroll (str, optional): Scroll timeout
- scroll_size (int, optional): Scroll batch size
- search_type (str, optional): Search type
- search_timeout (str, optional): Search timeout
- size (int, optional): Number of documents to delete
- sort (list, optional): Sort order
- terminate_after (int, optional): Maximum number of documents to process
- timeout (str, optional): Operation timeout
- version (bool, optional): Return document version
- version_type (bool, optional): Return version type
- wait_for_active_shards (str/int, optional): Wait for active shards
- wait_for_completion (bool, optional): Wait for operation completion
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Deletion results including deleted count and conflicts
"""Perform search queries across documents and indices.
def search(self, index=None, body=None, **kwargs):
"""
Execute a search query.
Parameters:
- index (str/list, optional): Index name(s) to search
- body (dict, optional): Search query body
- doc_type (str/list, optional): Document type(s) (deprecated)
- _source (bool/list/str, optional): Source filtering
- _source_excludes (list, optional): Source fields to exclude
- _source_includes (list, optional): Source fields to include
- allow_no_indices (bool, optional): Allow no indices to match
- allow_partial_search_results (bool, optional): Allow partial results
- analyzer (str, optional): Analyzer for query string
- analyze_wildcard (bool, optional): Analyze wildcard terms
- batched_reduce_size (int, optional): Batched reduce size
- ccs_minimize_roundtrips (bool, optional): Cross-cluster search optimization
- default_operator (str, optional): Default operator ('AND' or 'OR')
- df (str, optional): Default field for query string
- docvalue_fields (list, optional): Doc value fields to return
- expand_wildcards (str, optional): Wildcard expansion ('open', 'closed', 'hidden', 'none', 'all')
- explain (bool, optional): Return explanation for each hit
- from_ (int, optional): Starting document offset (default: 0)
- ignore_throttled (bool, optional): Ignore throttled indices
- ignore_unavailable (bool, optional): Ignore unavailable indices
- lenient (bool, optional): Ignore format-based query failures
- max_concurrent_shard_requests (int, optional): Max concurrent shard requests
- min_compatible_shard_node (str, optional): Minimum compatible shard node version
- preference (str, optional): Preference for which shard to query
- pre_filter_shard_size (int, optional): Pre-filter shard size threshold
- q (str, optional): Query string
- request_cache (bool, optional): Use request cache
- rest_total_hits_as_int (bool, optional): Return total hits as integer
- routing (str/list, optional): Routing value(s)
- scroll (str, optional): Scroll timeout for cursor-based pagination
- search_type (str, optional): Search type ('query_then_fetch', 'dfs_query_then_fetch')
- seq_no_primary_term (bool, optional): Return sequence number and primary term
- size (int, optional): Number of hits to return (default: 10)
- sort (list, optional): Sort order
- stats (list, optional): Statistics groups
- stored_fields (list, optional): Stored fields to return
- suggest_field (str, optional): Field to suggest on
- suggest_mode (str, optional): Suggest mode
- suggest_size (int, optional): Number of suggestions
- suggest_text (str, optional): Text to suggest on
- terminate_after (int, optional): Maximum number of documents to collect
- timeout (str, optional): Search timeout
- track_scores (bool, optional): Track scores for each hit
- track_total_hits (bool/int, optional): Track total hits
- typed_keys (bool, optional): Prefix aggregation names with type
- version (bool, optional): Return document version
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Search results containing 'hits', 'aggregations', 'took', etc.
Raises:
RequestError: If search query is malformed
"""
def count(self, index=None, body=None, **kwargs):
"""
Count documents matching a query.
Parameters:
- index (str/list, optional): Index name(s)
- body (dict, optional): Query body
- doc_type (str/list, optional): Document type(s) (deprecated)
- allow_no_indices (bool, optional): Allow no indices to match
- analyzer (str, optional): Analyzer for query string
- analyze_wildcard (bool, optional): Analyze wildcard terms
- default_operator (str, optional): Default operator
- df (str, optional): Default field
- expand_wildcards (str, optional): Wildcard expansion
- ignore_throttled (bool, optional): Ignore throttled indices
- ignore_unavailable (bool, optional): Ignore unavailable indices
- lenient (bool, optional): Ignore format-based failures
- min_score (float, optional): Minimum score threshold
- preference (str, optional): Preference for which shard to query
- q (str, optional): Query string
- routing (str/list, optional): Routing value(s)
- terminate_after (int, optional): Maximum documents to count
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Count result containing 'count' field
"""
def scroll(self, scroll_id, scroll='5m', **kwargs):
"""
Continue scrolling through search results.
Parameters:
- scroll_id (str): Scroll ID from previous search/scroll request
- scroll (str, optional): Scroll timeout (default: '5m')
- rest_total_hits_as_int (bool, optional): Return total hits as integer
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Next batch of search results
Raises:
NotFoundError: If scroll ID is invalid or expired
"""
def clear_scroll(self, scroll_id=None, body=None, **kwargs):
"""
Clear scroll context to free resources.
Parameters:
- scroll_id (str/list, optional): Scroll ID(s) to clear
- body (dict, optional): Body containing scroll_id list
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Clear scroll response
"""Perform multiple operations in a single request for improved performance.
def bulk(self, body, index=None, **kwargs):
"""
Perform multiple index, create, update, or delete operations.
Parameters:
- body (list/str): List of operations or newline-delimited JSON string
- index (str, optional): Default index for operations without index
- doc_type (str, optional): Default document type (deprecated)
- pipeline (str, optional): Default ingest pipeline
- refresh (str/bool, optional): Refresh policy
- routing (str, optional): Default routing value
- timeout (str, optional): Operation timeout
- wait_for_active_shards (str/int, optional): Wait for active shards
- require_alias (bool, optional): Require target to be an alias
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Bulk response containing 'items' list with operation results
Each operation format:
{"index": {"_index": "test", "_id": "1"}}
{"field1": "value1"}
{"create": {"_index": "test", "_id": "2"}}
{"field1": "value2"}
{"update": {"_index": "test", "_id": "1"}}
{"doc": {"field1": "updated_value"}}
{"delete": {"_index": "test", "_id": "2"}}
"""Retrieve multiple documents in a single request.
def mget(self, body, index=None, **kwargs):
"""
Retrieve multiple documents by ID.
Parameters:
- body (dict): Multi-get request body with 'docs' or 'ids' field
- index (str, optional): Default index name
- doc_type (str, optional): Default document type (deprecated)
- _source (bool/list/str, optional): Source filtering
- _source_excludes (list, optional): Source fields to exclude
- _source_includes (list, optional): Source fields to include
- preference (str, optional): Preference for which shard to query
- realtime (bool, optional): Whether to perform realtime get
- refresh (bool, optional): Refresh before get
- routing (str, optional): Default routing value
- stored_fields (list, optional): Stored fields to retrieve
- request_timeout (float, optional): Request timeout in seconds
Body format:
{
"docs": [
{"_index": "test", "_id": "1"},
{"_index": "test", "_id": "2", "_source": ["field1"]}
]
}
Or with default index:
{
"ids": ["1", "2", "3"]
}
Returns:
dict: Multi-get response with 'docs' array containing individual responses
"""
def msearch(self, body, index=None, **kwargs):
"""
Execute multiple search queries.
Parameters:
- body (list/str): List of search requests or newline-delimited JSON
- index (str/list, optional): Default index name(s)
- doc_type (str/list, optional): Default document type(s) (deprecated)
- ccs_minimize_roundtrips (bool, optional): Cross-cluster search optimization
- max_concurrent_searches (int, optional): Maximum concurrent searches
- rest_total_hits_as_int (bool, optional): Return total hits as integer
- typed_keys (bool, optional): Prefix aggregation names with type
- request_timeout (float, optional): Request timeout in seconds
Body format (alternating headers and bodies):
[
{"index": "test1"},
{"query": {"match_all": {}}},
{"index": "test2", "type": "doc"},
{"query": {"match": {"field": "value"}}}
]
Returns:
dict: Multi-search response with 'responses' array containing individual results
"""Manage and execute search templates for reusable queries.
def put_script(self, id, body, context=None, **kwargs):
"""
Store a script for later execution.
Parameters:
- id (str): Script ID
- body (dict): Script definition
- context (str, optional): Script context ('search', 'filter', 'update', etc.)
- master_timeout (str, optional): Master node timeout
- timeout (str, optional): Operation timeout
- request_timeout (float, optional): Request timeout in seconds
Body format:
{
"script": {
"lang": "painless",
"source": "Math.log(_score * 2) + params.factor",
"params": {"factor": 1.2}
}
}
Returns:
dict: Script creation response
"""
def get_script(self, id, **kwargs):
"""
Retrieve a stored script.
Parameters:
- id (str): Script ID
- master_timeout (str, optional): Master node timeout
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Script definition
"""
def delete_script(self, id, **kwargs):
"""
Delete a stored script.
Parameters:
- id (str): Script ID
- master_timeout (str, optional): Master node timeout
- timeout (str, optional): Operation timeout
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Deletion response
"""
def render_template(self, body=None, id=None, **kwargs):
"""
Render a search template with parameters.
Parameters:
- body (dict, optional): Template and parameters
- id (str, optional): Stored template ID
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Rendered template output
"""Additional search capabilities for specialized use cases.
def explain(self, index, id, body=None, **kwargs):
"""
Explain why a document matches or doesn't match a query.
Parameters:
- index (str): Index name
- id (str): Document ID
- body (dict, optional): Query to explain
- doc_type (str, optional): Document type (deprecated)
- _source (bool/list/str, optional): Source filtering
- _source_excludes (list, optional): Source fields to exclude
- _source_includes (list, optional): Source fields to include
- analyzer (str, optional): Analyzer for query string
- analyze_wildcard (bool, optional): Analyze wildcard terms
- default_operator (str, optional): Default operator
- df (str, optional): Default field
- lenient (bool, optional): Ignore format-based failures
- parent (str, optional): Parent document ID
- preference (str, optional): Preference for which shard to query
- q (str, optional): Query string
- routing (str, optional): Routing value
- stored_fields (list, optional): Stored fields to return
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Explanation of query scoring
"""
def field_caps(self, index=None, **kwargs):
"""
Get field capabilities across indices.
Parameters:
- index (str/list, optional): Index name(s)
- fields (str/list, optional): Field name(s) to get capabilities for
- allow_no_indices (bool, optional): Allow no indices to match
- expand_wildcards (str, optional): Wildcard expansion
- ignore_unavailable (bool, optional): Ignore unavailable indices
- include_unmapped (bool, optional): Include unmapped fields
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Field capabilities information
"""
def rank_eval(self, body, index=None, **kwargs):
"""
Evaluate search query ranking quality.
Parameters:
- body (dict): Ranking evaluation specification
- index (str/list, optional): Index name(s)
- allow_no_indices (bool, optional): Allow no indices to match
- expand_wildcards (str, optional): Wildcard expansion
- ignore_unavailable (bool, optional): Ignore unavailable indices
- request_timeout (float, optional): Request timeout in seconds
Returns:
dict: Ranking evaluation results
"""from opensearchpy import OpenSearch
client = OpenSearch([{'host': 'localhost', 'port': 9200}])
# Index a document
doc = {'title': 'Test Document', 'content': 'This is a test'}
response = client.index(index='test-index', id='1', body=doc)
print(f"Indexed document: {response['result']}")
# Get the document
doc = client.get(index='test-index', id='1')
print(f"Retrieved: {doc['_source']}")
# Update the document
update_body = {'doc': {'content': 'Updated content'}}
client.update(index='test-index', id='1', body=update_body)
# Search for documents
search_body = {
'query': {
'match': {'title': 'Test'}
}
}
results = client.search(index='test-index', body=search_body)
print(f"Found {results['hits']['total']['value']} documents")
# Delete the document
client.delete(index='test-index', id='1')# Bulk indexing
actions = [
{'index': {'_index': 'test-index', '_id': '1'}},
{'title': 'Document 1', 'content': 'Content 1'},
{'index': {'_index': 'test-index', '_id': '2'}},
{'title': 'Document 2', 'content': 'Content 2'},
{'update': {'_index': 'test-index', '_id': '1'}},
{'doc': {'status': 'updated'}},
{'delete': {'_index': 'test-index', '_id': '2'}}
]
response = client.bulk(body=actions)
for item in response['items']:
for operation, result in item.items():
print(f"{operation}: {result['result']}")# Start scroll search
search_body = {
'query': {'match_all': {}},
'sort': ['_doc']
}
response = client.search(
index='large-index',
body=search_body,
scroll='5m',
size=1000
)
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
# Process first batch
for hit in hits:
print(hit['_source'])
# Continue scrolling
while len(hits) > 0:
response = client.scroll(scroll_id=scroll_id, scroll='5m')
hits = response['hits']['hits']
for hit in hits:
print(hit['_source'])
# Clear scroll context
client.clear_scroll(scroll_id=scroll_id)Install with Tessl CLI
npx tessl i tessl/pypi-opensearch-py