Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities
—
Specialized client interfaces for different aspects of OpenSearch cluster management. These namespaced APIs are accessible as properties on both OpenSearch and AsyncOpenSearch client instances, providing organized access to related functionality.
Comprehensive index lifecycle management including creation, deletion, mapping management, and settings configuration.
class IndicesClient:
def create(self, index, body=None, **kwargs):
"""
Create an index with optional settings and mappings.
Parameters:
- index (str): Index name
- body (dict, optional): Index settings and mappings
- wait_for_active_shards (str/int, optional): Wait for active shards
- timeout (str, optional): Operation timeout
- master_timeout (str, optional): Master node timeout
Returns:
dict: Index creation response
"""
def delete(self, index, **kwargs):
"""Delete one or more indices."""
def exists(self, index, **kwargs):
"""Check if an index exists."""
def get(self, index, **kwargs):
"""Get index information including settings and mappings."""
def get_mapping(self, index=None, **kwargs):
"""Get mapping definitions for indices."""
def put_mapping(self, index, body, **kwargs):
"""Update mapping for existing indices."""
def get_settings(self, index=None, name=None, **kwargs):
"""Get index settings."""
def put_settings(self, body, index=None, **kwargs):
"""Update index settings."""
def refresh(self, index=None, **kwargs):
"""Refresh indices to make recent changes visible."""
def flush(self, index=None, **kwargs):
"""Flush indices to ensure data is written to disk."""
def forcemerge(self, index=None, **kwargs):
"""Force merge index segments."""
def open(self, index, **kwargs):
"""Open a closed index."""
def close(self, index, **kwargs):
"""Close an index."""
def clear_cache(self, index=None, **kwargs):
"""Clear index caches."""
def analyze(self, body=None, index=None, **kwargs):
"""Analyze text using index analyzers."""
def validate_query(self, body=None, index=None, **kwargs):
"""Validate a query without executing it."""Cluster-wide operations for monitoring health, managing settings, and administrative tasks.
class ClusterClient:
def health(self, index=None, **kwargs):
"""
Get cluster health status.
Parameters:
- index (str/list, optional): Index name(s) to check
- level (str, optional): Detail level ('cluster', 'indices', 'shards')
- local (bool, optional): Return local cluster state
- master_timeout (str, optional): Master node timeout
- timeout (str, optional): Operation timeout
- wait_for_active_shards (str/int, optional): Wait for active shards
- wait_for_nodes (str, optional): Wait for N nodes
- wait_for_events (str, optional): Wait for priority events
- wait_for_no_relocating_shards (bool, optional): Wait for no relocating shards
- wait_for_no_initializing_shards (bool, optional): Wait for no initializing shards
- wait_for_status (str, optional): Wait for status ('green', 'yellow', 'red')
Returns:
dict: Cluster health information
"""
def state(self, metric=None, index=None, **kwargs):
"""Get comprehensive cluster state information."""
def stats(self, node_id=None, **kwargs):
"""Get cluster statistics."""
def pending_tasks(self, **kwargs):
"""Get cluster pending tasks."""
def get_settings(self, **kwargs):
"""Get cluster settings."""
def put_settings(self, body, **kwargs):
"""Update cluster settings."""
def reroute(self, body=None, **kwargs):
"""Manually reroute cluster shards."""
def allocation_explain(self, body=None, **kwargs):
"""Explain shard allocation decisions."""
def remote_info(self, **kwargs):
"""Get remote cluster information."""Node-level operations for monitoring and managing individual cluster nodes.
class NodesClient:
def info(self, node_id=None, metric=None, **kwargs):
"""
Get node information.
Parameters:
- node_id (str/list, optional): Node ID(s) or names
- metric (str/list, optional): Information metrics to retrieve
- flat_settings (bool, optional): Return flat settings format
- timeout (str, optional): Operation timeout
Returns:
dict: Node information
"""
def stats(self, node_id=None, metric=None, **kwargs):
"""Get node statistics."""
def hot_threads(self, node_id=None, **kwargs):
"""Get hot threads information for nodes."""
def usage(self, node_id=None, metric=None, **kwargs):
"""Get node feature usage statistics."""
def reload_secure_settings(self, node_id=None, body=None, **kwargs):
"""Reload secure settings on nodes."""Human-readable cluster information in tabular format, useful for monitoring and debugging.
class CatClient:
def aliases(self, name=None, **kwargs):
"""Show index aliases in tabular format."""
def allocation(self, node_id=None, **kwargs):
"""Show shard allocation across nodes."""
def count(self, index=None, **kwargs):
"""Show document counts per index."""
def health(self, **kwargs):
"""Show cluster health in tabular format."""
def indices(self, index=None, **kwargs):
"""
Show indices information in tabular format.
Parameters:
- index (str/list, optional): Index name(s)
- bytes (str, optional): Unit for byte values ('b', 'k', 'm', 'g', 't', 'p')
- format (str, optional): Response format ('json', 'yaml')
- h (str/list, optional): Column headers to display
- help (bool, optional): Show column descriptions
- local (bool, optional): Return local information
- master_timeout (str, optional): Master node timeout
- pri (bool, optional): Show only primary shards
- s (str/list, optional): Sort columns
- time (str, optional): Time unit ('d', 'h', 'm', 's', 'ms', 'micros', 'nanos')
- v (bool, optional): Verbose output with headers
Returns:
str: Tabular index information
"""
def master(self, **kwargs):
"""Show master node information."""
def nodes(self, **kwargs):
"""Show node information in tabular format."""
def pending_tasks(self, **kwargs):
"""Show pending cluster tasks."""
def plugins(self, **kwargs):
"""Show installed plugins per node."""
def recovery(self, index=None, **kwargs):
"""Show index recovery information."""
def repositories(self, **kwargs):
"""Show snapshot repositories."""
def segments(self, index=None, **kwargs):
"""Show index segment information."""
def shards(self, index=None, **kwargs):
"""Show shard information."""
def snapshots(self, repository=None, **kwargs):
"""Show snapshot information."""
def tasks(self, **kwargs):
"""Show currently running tasks."""
def templates(self, name=None, **kwargs):
"""Show index templates."""
def thread_pool(self, thread_pool_patterns=None, **kwargs):
"""Show thread pool statistics."""Management of ingest pipelines for preprocessing documents before indexing.
class IngestClient:
def put_pipeline(self, id, body, **kwargs):
"""
Create or update an ingest pipeline.
Parameters:
- id (str): Pipeline ID
- body (dict): Pipeline definition with processors
- master_timeout (str, optional): Master node timeout
- timeout (str, optional): Operation timeout
Body format:
{
"description": "Pipeline description",
"processors": [
{
"set": {
"field": "processed",
"value": true
}
}
]
}
Returns:
dict: Pipeline creation response
"""
def get_pipeline(self, id=None, **kwargs):
"""Get ingest pipeline definitions."""
def delete_pipeline(self, id, **kwargs):
"""Delete an ingest pipeline."""
def simulate(self, body, id=None, **kwargs):
"""Simulate pipeline execution on sample documents."""
def processor_grok(self, **kwargs):
"""Get grok processor patterns."""Backup and restore operations for indices and cluster state.
class SnapshotClient:
def create_repository(self, repository, body, **kwargs):
"""
Create a snapshot repository.
Parameters:
- repository (str): Repository name
- body (dict): Repository configuration
- master_timeout (str, optional): Master node timeout
- timeout (str, optional): Operation timeout
- verify (bool, optional): Verify repository after creation
Returns:
dict: Repository creation response
"""
def get_repository(self, repository=None, **kwargs):
"""Get snapshot repository information."""
def delete_repository(self, repository, **kwargs):
"""Delete a snapshot repository."""
def verify_repository(self, repository, **kwargs):
"""Verify repository integrity."""
def create(self, repository, snapshot, body=None, **kwargs):
"""Create a snapshot of indices."""
def get(self, repository, snapshot, **kwargs):
"""Get snapshot information."""
def delete(self, repository, snapshot, **kwargs):
"""Delete a snapshot."""
def restore(self, repository, snapshot, body=None, **kwargs):
"""Restore indices from a snapshot."""
def status(self, repository=None, snapshot=None, **kwargs):
"""Get snapshot status information."""
def cleanup_repository(self, repository, **kwargs):
"""Clean up stale repository data."""Monitor and manage long-running operations and background tasks.
class TasksClient:
def list(self, **kwargs):
"""
List currently running tasks.
Parameters:
- nodes (str/list, optional): Node IDs to query
- actions (str/list, optional): Action types to filter
- detailed (bool, optional): Return detailed task information
- parent_task_id (str, optional): Filter by parent task ID
- wait_for_completion (bool, optional): Wait for task completion
- timeout (str, optional): Operation timeout
- group_by (str, optional): Group tasks by ('nodes', 'parents', 'none')
Returns:
dict: Task information
"""
def get(self, task_id, **kwargs):
"""Get information about a specific task."""
def cancel(self, task_id=None, **kwargs):
"""Cancel tasks."""Security-related operations when the security plugin is installed.
class SecurityClient:
def get_account_details(self, **kwargs):
"""Get current user account details."""
def change_password(self, body, **kwargs):
"""Change current user password."""
def get_user(self, username=None, **kwargs):
"""Get user information."""
def create_user(self, username, body, **kwargs):
"""Create a new user."""
def patch_user(self, username, body, **kwargs):
"""Update user attributes."""
def delete_user(self, username, **kwargs):
"""Delete a user."""
def get_role(self, role=None, **kwargs):
"""Get role information."""
def create_role(self, role, body, **kwargs):
"""Create a new role."""
def patch_role(self, role, body, **kwargs):
"""Update role permissions."""
def delete_role(self, role, **kwargs):
"""Delete a role."""
def get_role_mapping(self, role=None, **kwargs):
"""Get role mapping information."""
def create_role_mapping(self, role, body, **kwargs):
"""Create role mapping."""
def patch_role_mapping(self, role, body, **kwargs):
"""Update role mapping."""
def delete_role_mapping(self, role, **kwargs):
"""Delete role mapping."""Additional specialized APIs for advanced OpenSearch features.
class DanglingIndicesClient:
def list_dangling_indices(self, **kwargs):
"""List dangling indices."""
def import_dangling_index(self, index_uuid, **kwargs):
"""Import a dangling index."""
def delete_dangling_index(self, index_uuid, **kwargs):
"""Delete a dangling index."""
class FeaturesClient:
def get_features(self, **kwargs):
"""Get available features."""
def reset_features(self, **kwargs):
"""Reset features to default state."""
class RemoteClient:
def info(self, **kwargs):
"""Get remote cluster information."""
class SearchPipelineClient:
def put(self, id, body, **kwargs):
"""Create or update a search pipeline."""
def get(self, id=None, **kwargs):
"""Get search pipeline definitions."""
def delete(self, id, **kwargs):
"""Delete a search pipeline."""
class WlmClient:
def create_query_group(self, body, **kwargs):
"""Create a workload management query group."""
def get_query_group(self, name=None, **kwargs):
"""Get query group information."""
def update_query_group(self, name, body, **kwargs):
"""Update query group settings."""
def delete_query_group(self, name, **kwargs):
"""Delete a query group."""from opensearchpy import OpenSearch
client = OpenSearch([{'host': 'localhost', 'port': 9200}])
# Create an index with settings and mappings
index_body = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"custom_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop"]
}
}
}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "custom_analyzer"
},
"publish_date": {
"type": "date"
},
"category": {
"type": "keyword"
}
}
}
}
response = client.indices.create(index='my-index', body=index_body)
print(f"Index created: {response['acknowledged']}")
# Update index settings
settings_update = {
"settings": {
"refresh_interval": "30s"
}
}
client.indices.put_settings(index='my-index', body=settings_update)
# Add a field to existing mapping
mapping_update = {
"properties": {
"tags": {
"type": "keyword"
}
}
}
client.indices.put_mapping(index='my-index', body=mapping_update)
# Check if index exists
if client.indices.exists(index='my-index'):
print("Index exists")
# Get index information
index_info = client.indices.get(index='my-index')
print(f"Index settings: {index_info['my-index']['settings']}")# Check cluster health
health = client.cluster.health()
print(f"Cluster status: {health['status']}")
print(f"Active shards: {health['active_shards']}")
# Get detailed cluster state
state = client.cluster.state(metric=['nodes', 'routing_table'])
print(f"Master node: {state['master_node']}")
# Monitor cluster statistics
stats = client.cluster.stats()
print(f"Total indices: {stats['indices']['count']}")
print(f"Total nodes: {stats['nodes']['count']['total']}")
# Check pending tasks
tasks = client.cluster.pending_tasks()
if tasks['tasks']:
print(f"Pending tasks: {len(tasks['tasks'])}")# Get all node information
nodes_info = client.nodes.info()
for node_id, node_info in nodes_info['nodes'].items():
print(f"Node: {node_info['name']} ({node_info['version']})")
# Get node statistics
nodes_stats = client.nodes.stats(metric=['jvm', 'indices'])
for node_id, stats in nodes_stats['nodes'].items():
jvm_mem = stats['jvm']['mem']
print(f"Node JVM heap used: {jvm_mem['heap_used_percent']}%")# Get indices in tabular format
indices_info = client.cat.indices(v=True, h=['index', 'docs.count', 'store.size'])
print(indices_info)
# Monitor cluster health
health_info = client.cat.health(v=True)
print(health_info)
# Check shard allocation
allocation_info = client.cat.allocation(v=True, h=['node', 'shards', 'disk.used_percent'])
print(allocation_info)# Create an ingest pipeline
pipeline_body = {
"description": "Log processing pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"]
}
},
{
"date": {
"field": "timestamp",
"formats": ["ISO8601"]
}
},
{
"lowercase": {
"field": "level"
}
}
]
}
client.ingest.put_pipeline(id='log-pipeline', body=pipeline_body)
# Test the pipeline with sample data
simulate_body = {
"pipeline": pipeline_body,
"docs": [
{
"_source": {
"message": "2024-01-01T10:00:00.000Z ERROR Database connection failed"
}
}
]
}
result = client.ingest.simulate(body=simulate_body)
print(f"Processed document: {result['docs'][0]['doc']['_source']}")# Create a filesystem snapshot repository
repo_body = {
"type": "fs",
"settings": {
"location": "/backup/opensearch",
"compress": True
}
}
client.snapshot.create_repository(repository='backup-repo', body=repo_body)
# Create a snapshot of specific indices
snapshot_body = {
"indices": "my-index,another-index",
"ignore_unavailable": True,
"include_global_state": False,
"metadata": {
"taken_by": "python-client",
"taken_because": "daily backup"
}
}
client.snapshot.create(
repository='backup-repo',
snapshot='daily-snapshot-2024-01-01',
body=snapshot_body,
wait_for_completion=True
)
# List snapshots
snapshots = client.snapshot.get(repository='backup-repo', snapshot='*')
for snapshot in snapshots['snapshots']:
print(f"Snapshot: {snapshot['snapshot']} - State: {snapshot['state']}")Install with Tessl CLI
npx tessl i tessl/pypi-opensearch-py