CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-opensearch-py

Python client for OpenSearch providing comprehensive search, indexing, and cluster management capabilities

Pending
Overview
Eval results
Files

plugin-apis.mddocs/

Plugin APIs

Dedicated client interfaces for OpenSearch plugins providing advanced functionality like machine learning, security analytics, alerting, and specialized search capabilities. These plugin APIs are accessible as properties on both OpenSearch and AsyncOpenSearch client instances.

Capabilities

Machine Learning Plugin

Advanced machine learning capabilities including model management, training, and inference operations.

class MlClient:
    def register_model(self, body, **kwargs):
        """
        Register a machine learning model.
        
        Parameters:
        - body (dict): Model registration configuration
        
        Body format:
        {
            "name": "my-model",
            "version": "1.0.0",
            "model_format": "TORCH_SCRIPT",
            "model_config": {
                "model_type": "bert",
                "embedding_dimension": 768,
                "framework_type": "sentence_transformers"
            }
        }
        
        Returns:
        dict: Model registration response with task_id
        """
    
    def get_model(self, model_id, **kwargs):
        """Get model information and status."""
        
    def delete_model(self, model_id, **kwargs):
        """Delete a registered model."""
        
    def deploy_model(self, model_id, **kwargs):
        """Deploy a model to make it available for inference."""
        
    def undeploy_model(self, model_id, **kwargs):
        """Undeploy a model to stop inference."""
        
    def predict(self, model_id, body, **kwargs):
        """Run inference using a deployed model."""
        
    def get_task(self, task_id, **kwargs):
        """Get ML task status and results."""
        
    def search_models(self, body=None, **kwargs):
        """Search for registered models."""

Neural Search Plugin

Neural search statistics and information for neural network-based search capabilities.

class NeuralClient:
    def stats(self, node_id=None, stat=None, **kwargs):
        """
        Provide information about the current status of the neural-search plugin.
        
        Parameters:
        - node_id (str, optional): Comma-separated list of node IDs or names
        - stat (str, optional): Comma-separated list of stats to retrieve
        - flat_stat_paths (bool): Return stats in flat form for readability
        - include_all_nodes (bool): Include aggregated statistics across all nodes
        - include_individual_nodes (bool): Include statistics for individual nodes  
        - include_info (bool): Include cluster-wide information
        - include_metadata (bool): Return stat metadata instead of raw values
        
        Returns:
        dict: Neural search plugin statistics and status information
        """

K-NN Search Plugin

K-nearest neighbor search for vector similarity operations.

class KnnClient:
    def search(self, body, index=None, **kwargs):
        """
        Perform k-NN vector search.
        
        Parameters:
        - body (dict): k-NN search query body
        - index (str/list, optional): Index name(s)
        
        Body format:
        {
            "query": {
                "knn": {
                    "vector_field": {
                        "vector": [0.1, 0.2, 0.3, ...],
                        "k": 10
                    }
                }
            }
        }
        
        Returns:
        dict: k-NN search results with similarity scores
        """
    
    def train_model(self, model_id, body, **kwargs):
        """Train a k-NN model for vector search optimization."""
        
    def get_model(self, model_id, **kwargs):
        """Get k-NN model information."""
        
    def delete_model(self, model_id, **kwargs):
        """Delete a k-NN model."""
        
    def warmup(self, index, **kwargs):
        """Warmup k-NN indices for improved performance."""
        
    def stats(self, node_id=None, stat_name=None, **kwargs):
        """Get k-NN plugin statistics."""

Security Analytics Plugin

Security threat detection and analytics capabilities.

class SecurityAnalyticsClient:
    def create_detector(self, body, **kwargs):
        """
        Create a security analytics detector.
        
        Parameters:
        - body (dict): Detector configuration
        
        Body format:
        {
            "name": "network-detector",
            "detector_type": "network",
            "enabled": true,
            "schedule": {
                "period": {
                    "interval": 5,
                    "unit": "MINUTES"
                }
            },
            "inputs": [
                {
                    "detector_input": {
                        "description": "Network logs",
                        "indices": ["network-logs*"],
                        "rules": []
                    }
                }
            ]
        }
        
        Returns:
        dict: Detector creation response
        """
    
    def get_detector(self, detector_id, **kwargs):
        """Get detector configuration and status."""
        
    def update_detector(self, detector_id, body, **kwargs):
        """Update detector configuration."""
        
    def delete_detector(self, detector_id, **kwargs):
        """Delete a detector."""
        
    def search_detectors(self, body=None, **kwargs):
        """Search for detectors."""
        
    def get_findings(self, **kwargs):
        """Get security findings from detectors."""
        
    def acknowledge_alerts(self, body, **kwargs):
        """Acknowledge security alerts."""

Alerting Plugin

Comprehensive alerting system for monitoring and notifications.

class AlertingClient:
    def create_monitor(self, body, **kwargs):
        """
        Create an alerting monitor.
        
        Parameters:
        - body (dict): Monitor configuration
        
        Body format:
        {
            "name": "high-error-rate-monitor",
            "type": "monitor",
            "monitor_type": "query_level_monitor",
            "enabled": true,
            "schedule": {
                "period": {
                    "interval": 1,
                    "unit": "MINUTES"
                }
            },
            "inputs": [
                {
                    "search": {
                        "indices": ["application-logs*"],
                        "query": {
                            "query": {
                                "bool": {
                                    "filter": [
                                        {
                                            "range": {
                                                "@timestamp": {
                                                    "gte": "now-5m"
                                                }
                                            }
                                        },
                                        {
                                            "term": {
                                                "level": "ERROR"
                                            }
                                        }
                                    ]
                                }
                            },
                            "aggs": {
                                "error_count": {
                                    "value_count": {
                                        "field": "level"
                                    }
                                }
                            }
                        }
                    }
                }
            ],
            "triggers": [
                {
                    "name": "high-error-trigger",
                    "severity": "2",
                    "condition": {
                        "script": {
                            "source": "ctx.results[0].aggregations.error_count.value > 10"
                        }
                    },
                    "actions": [
                        {
                            "name": "send-email",
                            "destination_id": "email-destination",
                            "message_template": {
                                "source": "High error rate detected: {{ctx.results.0.aggregations.error_count.value}} errors in the last 5 minutes"
                            }
                        }
                    ]
                }
            ]
        }
        
        Returns:
        dict: Monitor creation response
        """
    
    def get_monitor(self, monitor_id, **kwargs):
        """Get monitor configuration and status."""
        
    def update_monitor(self, monitor_id, body, **kwargs):
        """Update monitor configuration."""
        
    def delete_monitor(self, monitor_id, **kwargs):
        """Delete a monitor."""
        
    def search_monitors(self, body=None, **kwargs):
        """Search for monitors."""
        
    def run_monitor(self, monitor_id, **kwargs):
        """Manually run a monitor."""
        
    def acknowledge_alert(self, monitor_id, body=None, **kwargs):
        """Acknowledge alerts from a monitor."""
        
    def create_destination(self, body, **kwargs):
        """Create notification destination."""
        
    def get_destination(self, destination_id, **kwargs):
        """Get notification destination."""
        
    def update_destination(self, destination_id, body, **kwargs):
        """Update notification destination."""
        
    def delete_destination(self, destination_id, **kwargs):
        """Delete notification destination."""

SQL Plugin

SQL query interface for OpenSearch data.

class SqlClient:
    def query(self, body, **kwargs):
        """
        Execute SQL query against OpenSearch indices.
        
        Parameters:
        - body (dict): SQL query request
        
        Body format:
        {
            "query": "SELECT customer_name, order_total FROM orders WHERE order_date >= '2024-01-01' ORDER BY order_total DESC LIMIT 10",
            "fetch_size": 1000,
            "format": "json"
        }
        
        Returns:
        dict: Query results in specified format
        """
    
    def explain(self, body, **kwargs):
        """Explain SQL query execution plan."""
        
    def close(self, cursor, **kwargs):
        """Close SQL cursor for pagination."""
        
    def get_stats(self, **kwargs):
        """Get SQL plugin statistics."""
        
    def post_stats(self, body, **kwargs):
        """Update SQL plugin statistics."""

PPL (Piped Processing Language) Plugin

Piped processing language for data analysis and transformation.

class PplClient:
    def query(self, body, **kwargs):
        """
        Execute PPL query for data processing.
        
        Parameters:
        - body (dict): PPL query request
        
        Body format:
        {
            "query": "source=logs | where level='ERROR' | stats count() by service | sort count desc",
            "format": "json"
        }
        
        Returns:
        dict: PPL query results
        """
    
    def explain(self, body, **kwargs):
        """Explain PPL query execution plan."""

Additional Plugin APIs

Extended plugin capabilities for specialized use cases.

class AsynchronousSearchClient:
    def submit(self, body, index=None, **kwargs):
        """Submit asynchronous search request."""
        
    def get(self, id, **kwargs):
        """Get asynchronous search results."""
        
    def delete(self, id, **kwargs):
        """Delete asynchronous search."""
        
    def stats(self, **kwargs):
        """Get asynchronous search statistics."""

class FlowFrameworkClient:
    def create(self, body, **kwargs):
        """Create a workflow template."""
        
    def get_template(self, workflow_id, **kwargs):
        """Get workflow template."""
        
    def provision(self, workflow_id, **kwargs):
        """Provision resources for workflow."""
        
    def deprovision(self, workflow_id, **kwargs):
        """Deprovision workflow resources."""

class IndexManagementClient:
    def create_policy(self, policy_id, body, **kwargs):
        """Create index management policy."""
        
    def get_policy(self, policy_id, **kwargs):
        """Get index management policy."""
        
    def update_policy(self, policy_id, body, **kwargs):
        """Update index management policy."""
        
    def delete_policy(self, policy_id, **kwargs):
        """Delete index management policy."""
        
    def add_policy(self, index, body, **kwargs):
        """Add policy to index."""
        
    def remove_policy(self, index, **kwargs):
        """Remove policy from index."""
        
    def explain_index(self, index, **kwargs):
        """Explain index management status."""

class NotificationsClient:
    def create_config(self, body, **kwargs):
        """Create notification configuration."""
        
    def get_configs(self, **kwargs):
        """Get notification configurations."""
        
    def update_config(self, config_id, body, **kwargs):
        """Update notification configuration."""
        
    def delete_config(self, config_id, **kwargs):
        """Delete notification configuration."""
        
    def send_test(self, config_id, **kwargs):
        """Send test notification."""

class RollupsClient:
    def put(self, rollup_id, body, **kwargs):
        """Create rollup job."""
        
    def get(self, rollup_id=None, **kwargs):
        """Get rollup job."""
        
    def delete(self, rollup_id, **kwargs):
        """Delete rollup job."""
        
    def start(self, rollup_id, **kwargs):
        """Start rollup job."""
        
    def stop(self, rollup_id, **kwargs):
        """Stop rollup job."""
        
    def explain(self, rollup_id, **kwargs):
        """Explain rollup job status."""

class GeospatialClient:
    def put_geojson(self, index, body, **kwargs):
        """Index GeoJSON data for geospatial operations."""
        
    def search_geospatial(self, body, index=None, **kwargs):
        """Search using geospatial queries and filters."""
        
    def get_stats(self, **kwargs):
        """Get geospatial plugin statistics."""

class ObservabilityClient:
    def create_object(self, body, **kwargs):
        """Create observability object (dashboard, visualization, etc.)."""
        
    def get_object(self, object_id, **kwargs):
        """Get observability object configuration."""
        
    def list_objects(self, **kwargs):
        """List observability objects."""
        
    def delete_object(self, object_id, **kwargs):
        """Delete observability object."""

class ReplicationClient:
    def start_replication(self, leader_index, follower_index, body, **kwargs):
        """Start cross-cluster replication."""
        
    def stop_replication(self, follower_index, **kwargs):
        """Stop replication on follower index."""
        
    def pause_replication(self, follower_index, **kwargs):
        """Pause replication."""
        
    def resume_replication(self, follower_index, **kwargs):
        """Resume paused replication."""
        
    def get_replication_status(self, index=None, **kwargs):
        """Get replication status."""

class SearchRelevanceClient:
    def search_relevance(self, body, **kwargs):
        """Execute search relevance evaluation."""
        
    def compare_search_results(self, body, **kwargs):
        """Compare search results between different configurations."""

class TransformsClient:
    def put(self, transform_id, body, **kwargs):
        """Create transform job."""
        
    def get(self, transform_id=None, **kwargs):
        """Get transform job."""
        
    def delete(self, transform_id, **kwargs):
        """Delete transform job."""
        
    def start(self, transform_id, **kwargs):
        """Start transform job."""
        
    def stop(self, transform_id, **kwargs):
        """Stop transform job."""
        
    def preview(self, body, **kwargs):
        """Preview transform results."""

Usage Examples

Machine Learning Operations

from opensearchpy import OpenSearch

client = OpenSearch([{'host': 'localhost', 'port': 9200}])

# Register a pre-trained model
model_config = {
    "name": "sentence-transformer-model",
    "version": "1.0.0",
    "model_format": "TORCH_SCRIPT",
    "model_config": {
        "model_type": "bert",
        "embedding_dimension": 768,
        "framework_type": "sentence_transformers"
    },
    "url": "https://example.com/model.zip"
}

# Register the model
response = client.ml.register_model(body=model_config)
task_id = response['task_id']

# Check registration status
task_status = client.ml.get_task(task_id=task_id)
print(f"Registration status: {task_status['state']}")

# Deploy the model once registration is complete
if task_status['state'] == 'COMPLETED':
    model_id = task_status['model_id']
    client.ml.deploy_model(model_id=model_id)
    
    # Use model for inference
    inference_body = {
        "text_docs": ["What is machine learning?", "How does AI work?"]
    }
    
    results = client.ml.predict(model_id=model_id, body=inference_body)
    print(f"Embeddings: {results['inference_results']}")

K-NN Vector Search

# Perform k-NN search with vector embeddings
knn_query = {
    "size": 10,
    "query": {
        "knn": {
            "document_embedding": {
                "vector": [0.1, 0.2, 0.3, 0.4, 0.5],  # 5-dimensional vector
                "k": 10,
                "filter": {
                    "term": {
                        "category": "technology"
                    }
                }
            }
        }
    }
}

results = client.knn.search(index='documents', body=knn_query)
for hit in results['hits']['hits']:
    print(f"Score: {hit['_score']}, Document: {hit['_source']['title']}")

Alerting Configuration

# Create email notification destination
email_destination = {
    "name": "operations-email",
    "type": "email",
    "email": {
        "email_account_id": "default-email-account",
        "recipients": ["ops-team@company.com"],
        "subject": "OpenSearch Alert: {{ctx.monitor.name}}"
    }
}

destination_response = client.alerting.create_destination(body=email_destination)
destination_id = destination_response['_id']

# Create error rate monitor
monitor_config = {
    "name": "application-error-monitor",
    "type": "monitor",
    "monitor_type": "query_level_monitor",
    "enabled": True,
    "schedule": {
        "period": {
            "interval": 5,
            "unit": "MINUTES"
        }
    },
    "inputs": [
        {
            "search": {
                "indices": ["application-logs*"],
                "query": {
                    "query": {
                        "bool": {
                            "filter": [
                                {
                                    "range": {
                                        "@timestamp": {
                                            "gte": "now-5m"
                                        }
                                    }
                                },
                                {
                                    "term": {
                                        "level": "ERROR"
                                    }
                                }
                            ]
                        }
                    },
                    "aggs": {
                        "error_count": {
                            "value_count": {
                                "field": "level"
                            }
                        }
                    }
                }
            }
        }
    ],
    "triggers": [
        {
            "name": "high-error-rate",
            "severity": "2",
            "condition": {
                "script": {
                    "source": "ctx.results[0].aggregations.error_count.value > 50"
                }
            },
            "actions": [
                {
                    "name": "notify-ops-team",
                    "destination_id": destination_id,
                    "message_template": {
                        "source": "Alert: {{ctx.results.0.aggregations.error_count.value}} errors detected in the last 5 minutes"
                    }
                }
            ]
        }
    ]
}

monitor_response = client.alerting.create_monitor(body=monitor_config)
print(f"Monitor created: {monitor_response['_id']}")

SQL Queries

# Execute SQL query
sql_query = {
    "query": """
        SELECT 
            customer_name,
            COUNT(*) as order_count,
            SUM(order_total) as total_spent
        FROM orders 
        WHERE order_date >= '2024-01-01'
        GROUP BY customer_name
        ORDER BY total_spent DESC
        LIMIT 10
    """,
    "format": "json",
    "fetch_size": 1000
}

results = client.sql.query(body=sql_query)

# Process results
for row in results['datarows']:
    customer, count, total = row
    print(f"Customer: {customer}, Orders: {count}, Total: ${total}")

# Get query execution plan
explain_query = {
    "query": "SELECT * FROM orders WHERE customer_id = 123",
    "format": "json"
}

plan = client.sql.explain(body=explain_query)
print(f"Execution plan: {plan}")

Index Management Policies

# Create index lifecycle policy
policy_body = {
    "policy": {
        "description": "Log retention policy",
        "default_state": "hot",
        "states": [
            {
                "name": "hot",
                "actions": [
                    {
                        "rollover": {
                            "min_size": "50gb",
                            "min_doc_count": 1000000,
                            "min_index_age": "7d"
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "warm",
                        "conditions": {
                            "min_index_age": "7d"
                        }
                    }
                ]
            },
            {
                "name": "warm",
                "actions": [
                    {
                        "replica_count": {
                            "number_of_replicas": 0
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "delete",
                        "conditions": {
                            "min_index_age": "30d"
                        }
                    }
                ]
            },
            {
                "name": "delete",
                "actions": [
                    {
                        "delete": {}
                    }
                ]
            }
        ]
    }
}

# Create the policy
policy_response = client.index_management.create_policy(
    policy_id='log-retention-policy',
    body=policy_body
)

# Apply policy to index pattern
apply_policy = {
    "policy_id": "log-retention-policy"
}

client.index_management.add_policy(
    index='logs-*',
    body=apply_policy
)

# Check policy status
status = client.index_management.explain_index(index='logs-2024-01-01')
print(f"Policy status: {status}")

Learning to Rank (LTR) Plugin

Machine learning-based relevance ranking with feature stores, feature sets, and ranking models for advanced search result optimization.

class LtrClient:
    def cache_stats(self, **kwargs):
        """Retrieve cache statistics for all feature stores."""
    
    def clear_cache(self, store=None, **kwargs):
        """Clear the store caches."""
    
    def create_default_store(self, **kwargs):
        """Create the default feature store."""
    
    def create_store(self, store, body, **kwargs):
        """Create a new feature store."""
    
    def delete_default_store(self, **kwargs):
        """Delete the default feature store."""
    
    def delete_store(self, store, **kwargs):
        """Delete a feature store."""
    
    def get_store(self, store, **kwargs):
        """Get information about a feature store."""
    
    def list_stores(self, **kwargs):
        """List all feature stores."""
    
    def stats(self, store=None, **kwargs):
        """Get feature store statistics."""
    
    def add_features_to_set(self, store, featureset, body, **kwargs):
        """Add features to an existing feature set."""
    
    def add_features_to_set_by_query(self, store, featureset, body, **kwargs):
        """Add features to a feature set based on a query."""
    
    def create_feature(self, store, name, body, **kwargs):
        """Create a new feature in a feature store."""
    
    def create_featureset(self, store, name, body, **kwargs):
        """Create a new feature set in a feature store."""
    
    def create_model(self, store, name, body, **kwargs):
        """Create a new ranking model in a feature store."""
    
    def create_model_from_set(self, store, featureset, name, body, **kwargs):
        """Create a ranking model from a feature set."""
    
    def delete_feature(self, store, name, **kwargs):
        """Delete a feature from a feature store."""
    
    def delete_featureset(self, store, name, **kwargs):
        """Delete a feature set from a feature store."""
    
    def delete_model(self, store, name, **kwargs):
        """Delete a ranking model from a feature store."""
    
    def get_feature(self, store, name, **kwargs):
        """Get information about a specific feature."""
    
    def get_featureset(self, store, name, **kwargs):
        """Get information about a specific feature set."""
    
    def get_model(self, store, name, **kwargs):
        """Get information about a specific ranking model."""
    
    def search_features(self, store, body=None, **kwargs):
        """Search for features in a feature store."""
    
    def search_featuresets(self, store, body=None, **kwargs):
        """Search for feature sets in a feature store."""
    
    def search_models(self, store, body=None, **kwargs):
        """Search for ranking models in a feature store."""
    
    def update_feature(self, store, name, body, **kwargs):
        """Update an existing feature in a feature store."""
    
    def update_featureset(self, store, name, body, **kwargs):
        """Update an existing feature set in a feature store."""

Query Data Sources Plugin

Query data source management for connecting external data sources to OpenSearch.

class QueryClient:
    def create_data_source(self, **kwargs):
        """Create a new data source connection."""
    
    def update_data_source(self, datasource_name, **kwargs):
        """Update an existing data source connection."""
    
    def get_data_source(self, datasource_name, **kwargs):
        """Get information about a data source connection."""
    
    def get_data_sources(self, **kwargs):
        """List all data source connections."""
    
    def delete_data_source(self, datasource_name, **kwargs):
        """Delete a data source connection."""

Install with Tessl CLI

npx tessl i tessl/pypi-opensearch-py

docs

async-operations.md

authentication.md

core-client.md

document-modeling.md

dsl-queries.md

helper-functions.md

index.md

namespaced-apis.md

plugin-apis.md

tile.json