CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-elasticsearch

Python client for Elasticsearch with comprehensive API coverage and both sync and async support

Pending
Overview
Eval results
Files

vectorstore-helpers.mddocs/

Vectorstore Helpers

High-level abstraction for building vector search applications with Elasticsearch. The vectorstore helpers provide a unified interface for different retrieval strategies, embedding services, and vector similarity search patterns, supporting both dense and sparse vector approaches.

Capabilities

Vector Store

The main VectorStore class provides high-level document indexing and search capabilities with pluggable retrieval strategies.

class VectorStore:
    def __init__(
        self,
        client: Elasticsearch,
        *,
        index: str,
        retrieval_strategy: RetrievalStrategy,
        embedding_service: Optional[EmbeddingService] = None,
        num_dimensions: Optional[int] = None,
        text_field: str = "text_field",  
        vector_field: str = "vector_field",
        metadata_mappings: Optional[Dict[str, Any]] = None,
        user_agent: str = f"elasticsearch-py-vs/{version}",
        custom_index_settings: Optional[Dict[str, Any]] = None,
    ):
        """
        High-level vector store for document indexing and search.
        
        Parameters:
        - client: Elasticsearch client connection
        - index: Index name for the vector store
        - retrieval_strategy: Strategy for indexing and searching (see strategies)
        - embedding_service: Service for generating embeddings (if needed)
        - num_dimensions: Vector dimensions (for dense vector strategies)
        - text_field: Field name for text content
        - vector_field: Field name for vector embeddings
        - metadata_mappings: Schema for document metadata
        - user_agent: Custom user agent for tracking
        - custom_index_settings: Additional index configuration
        """

    def add_documents(
        self,
        documents: List[Dict[str, Any]],
        vectors: Optional[List[List[float]]] = None,
        ids: Optional[List[str]] = None,
        refresh: bool = True,
        create_index_if_not_exists: bool = True,
        bulk_kwargs: Optional[Dict[str, Any]] = None,
    ) -> List[str]:
        """
        Add documents to the vector store.
        
        Parameters:
        - documents: List of documents with text and metadata
        - vectors: Pre-computed vectors (optional, depends on strategy)
        - ids: Document IDs (auto-generated if not provided)
        - refresh: Whether to refresh index after adding
        - create_index_if_not_exists: Auto-create index if needed
        - bulk_kwargs: Additional bulk indexing parameters
        
        Returns:
        List of document IDs that were added
        """

    def search(
        self,
        query: Optional[str] = None,
        *,
        query_vector: Optional[List[float]] = None,
        k: int = 4,
        num_candidates: int = 50,
        filter: Optional[List[Dict[str, Any]]] = None,
        similarity_threshold: Optional[float] = None,
    ) -> List[Dict[str, Any]]:
        """
        Search for similar documents.
        
        Parameters:
        - query: Text query string
        - query_vector: Pre-computed query vector
        - k: Number of results to return
        - num_candidates: Number of candidates for kNN search
        - filter: Filter conditions to apply
        - similarity_threshold: Minimum similarity score
        
        Returns:
        List of search results with documents and scores
        """

    def similarity_search_with_score(
        self,
        query: str,
        *,
        k: int = 4,
        num_candidates: int = 50,
        filter: Optional[List[Dict[str, Any]]] = None,
    ) -> List[Tuple[Dict[str, Any], float]]:
        """
        Search with similarity scores included.
        
        Parameters:
        - query: Text query string  
        - k: Number of results to return
        - num_candidates: Number of candidates for kNN search
        - filter: Filter conditions to apply
        
        Returns:
        List of (document, score) tuples
        """

    def max_marginal_relevance_search(
        self,
        query: str,
        *,
        k: int = 4,
        fetch_k: int = 20,
        lambda_mult: float = 0.5,
        filter: Optional[List[Dict[str, Any]]] = None,
    ) -> List[Dict[str, Any]]:
        """
        Maximal marginal relevance search for diverse results.
        
        Parameters:
        - query: Text query string
        - k: Number of final results to return
        - fetch_k: Number of initial candidates to fetch
        - lambda_mult: Diversity parameter (0=max diversity, 1=max relevance)
        - filter: Filter conditions to apply
        
        Returns:
        List of diverse search results
        """

    def delete(self, ids: Optional[List[str]] = None) -> bool:
        """
        Delete documents by IDs or delete entire index.
        
        Parameters:
        - ids: Document IDs to delete (if None, deletes entire index)
        
        Returns:
        True if deletion was successful
        """

    def close(self) -> None:
        """Close the vector store and clean up resources."""

Async Vector Store

Asynchronous version of VectorStore for high-performance applications.

class AsyncVectorStore:
    def __init__(
        self,
        client: AsyncElasticsearch,
        *,
        index: str,
        retrieval_strategy: AsyncRetrievalStrategy,
        embedding_service: Optional[AsyncEmbeddingService] = None,
        num_dimensions: Optional[int] = None,
        text_field: str = "text_field",
        vector_field: str = "vector_field",
        metadata_mappings: Optional[Dict[str, Any]] = None,
        user_agent: str = f"elasticsearch-py-vs/{version}",
        custom_index_settings: Optional[Dict[str, Any]] = None,
    ):
        """Async version of VectorStore with identical interface."""

    async def add_documents(
        self,
        documents: List[Dict[str, Any]],
        vectors: Optional[List[List[float]]] = None,
        ids: Optional[List[str]] = None,
        refresh: bool = True,
        create_index_if_not_exists: bool = True,
        bulk_kwargs: Optional[Dict[str, Any]] = None,
    ) -> List[str]:
        """Async version of add_documents."""

    async def search(
        self,
        query: Optional[str] = None,
        *,
        query_vector: Optional[List[float]] = None,
        k: int = 4,
        num_candidates: int = 50,  
        filter: Optional[List[Dict[str, Any]]] = None,
        similarity_threshold: Optional[float] = None,
    ) -> List[Dict[str, Any]]:
        """Async version of search."""

    async def max_marginal_relevance_search(
        self,
        query: str,
        *,
        k: int = 4,
        fetch_k: int = 20,
        lambda_mult: float = 0.5,
        filter: Optional[List[Dict[str, Any]]] = None,
    ) -> List[Dict[str, Any]]:
        """Async version of max_marginal_relevance_search."""

    async def delete(self, ids: Optional[List[str]] = None) -> bool:
        """Async version of delete."""

    async def close(self) -> None:
        """Async version of close."""

Retrieval Strategies

Different strategies for vector indexing and search, each optimized for specific use cases.

class RetrievalStrategy(ABC):
    @abstractmethod
    def es_query(
        self,
        *,
        query: Optional[str],
        query_vector: Optional[List[float]],
        text_field: str,
        vector_field: str,
        k: int,
        num_candidates: int,
        filter: List[Dict[str, Any]] = [],
    ) -> Dict[str, Any]:
        """Generate Elasticsearch query for the given parameters."""

    @abstractmethod  
    def es_mappings_settings(
        self,
        *,
        text_field: str,
        vector_field: str,
        num_dimensions: Optional[int],
    ) -> Tuple[Dict[str, Any], Dict[str, Any]]:
        """Generate index mappings and settings for this strategy."""

class DenseVectorStrategy(RetrievalStrategy):
    """Dense vector retrieval using kNN search with HNSW algorithm."""
    
    def __init__(
        self,
        *,
        distance: DistanceMetric = DistanceMetric.COSINE,
        model_id: Optional[str] = None,
        hybrid: bool = False,
    ):
        """
        Dense vector strategy using cosine/euclidean/dot-product similarity.
        
        Parameters:
        - distance: Distance metric for similarity calculation
        - model_id: Elasticsearch model ID for inference pipeline
        - hybrid: Whether to combine with BM25 text search
        """

class SparseVectorStrategy(RetrievalStrategy):
    """Sparse vector retrieval using learned sparse encoders like ELSER."""
    
    def __init__(self, *, model_id: str):
        """
        Sparse vector strategy using learned sparse representations.
        
        Parameters:
        - model_id: Elasticsearch model ID for sparse vector generation
        """

class BM25Strategy(RetrievalStrategy):
    """Traditional BM25 full-text search strategy."""
    
    def __init__(self, *, hybrid: bool = False):
        """
        BM25 text search strategy.
        
        Parameters:
        - hybrid: Whether to combine with vector search
        """

class DenseVectorScriptScoreStrategy(RetrievalStrategy):
    """Dense vector search using script_score for custom scoring."""
    
    def __init__(
        self,
        *,
        distance: DistanceMetric = DistanceMetric.COSINE,
        model_id: Optional[str] = None,
    ):
        """
        Dense vector strategy using script_score for flexibility.
        
        Parameters:
        - distance: Distance metric for script_score calculation
        - model_id: Elasticsearch model ID for inference pipeline
        """

Embedding Services

Services for generating vector embeddings from text, supporting both local and remote models.

class EmbeddingService(ABC):
    @abstractmethod
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for multiple documents."""

    @abstractmethod
    def embed_query(self, query: str) -> List[float]:
        """Generate embedding for a single query."""

class ElasticsearchEmbeddings(EmbeddingService):
    """Use Elasticsearch deployed models for embedding generation."""
    
    def __init__(
        self,
        *,
        client: Elasticsearch,
        model_id: str,
        input_field: str = "text_field",
        user_agent: str = f"elasticsearch-py-es/{version}",
    ):
        """
        Elasticsearch-based embedding service.
        
        Parameters:
        - client: Elasticsearch client
        - model_id: Deployed model ID in Elasticsearch
        - input_field: Input field name for the model
        - user_agent: Custom user agent for tracking
        """

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings using Elasticsearch inference API."""

    def embed_query(self, query: str) -> List[float]:
        """Generate query embedding using Elasticsearch inference API."""

class AsyncElasticsearchEmbeddings(AsyncEmbeddingService):
    """Async version of ElasticsearchEmbeddings."""
    
    def __init__(
        self,
        *,
        client: AsyncElasticsearch,
        model_id: str,
        input_field: str = "text_field",
        user_agent: str = f"elasticsearch-py-es/{version}",
    ):
        """Async Elasticsearch embedding service."""

    async def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Async generate embeddings for documents."""

    async def embed_query(self, query: str) -> List[float]:
        """Async generate query embedding."""

Distance Metrics and Utilities

Vector similarity calculations and maximal marginal relevance for diverse results.

class DistanceMetric(str, Enum):
    """Elasticsearch dense vector distance metrics."""
    
    COSINE = "COSINE"                      # Cosine similarity
    DOT_PRODUCT = "DOT_PRODUCT"            # Dot product similarity  
    EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE"  # L2 distance
    MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT"    # Maximum inner product

def maximal_marginal_relevance(
    query_embedding: List[float],
    embedding_list: List[List[float]],
    lambda_mult: float = 0.5,
    k: int = 4,
) -> List[int]:
    """
    Calculate maximal marginal relevance for diverse search results.
    
    Parameters:
    - query_embedding: Query vector
    - embedding_list: Candidate document vectors  
    - lambda_mult: Balance between relevance (1.0) and diversity (0.0)
    - k: Number of results to select
    
    Returns:
    List of indices for diverse, relevant results
    """

Usage Examples

Basic Dense Vector Search

from elasticsearch import Elasticsearch
from elasticsearch.helpers.vectorstore import (
    VectorStore, 
    DenseVectorStrategy,
    ElasticsearchEmbeddings,
    DistanceMetric
)

# Setup
client = Elasticsearch(['http://localhost:9200'])

# Configure embedding service
embedding_service = ElasticsearchEmbeddings(
    client=client,
    model_id="sentence-transformers__all-minilm-l6-v2"
)

# Configure dense vector strategy  
strategy = DenseVectorStrategy(
    distance=DistanceMetric.COSINE,
    model_id="sentence-transformers__all-minilm-l6-v2"
)

# Create vector store
vector_store = VectorStore(
    client=client,
    index="documents",
    retrieval_strategy=strategy,
    embedding_service=embedding_service,
    num_dimensions=384
)

# Add documents
documents = [
    {"text_field": "Elasticsearch is a search engine", "metadata": {"category": "tech"}},
    {"text_field": "Python is a programming language", "metadata": {"category": "programming"}},
    {"text_field": "Machine learning with transformers", "metadata": {"category": "ai"}}
]

ids = vector_store.add_documents(documents)

# Search
results = vector_store.search(
    query="search technology",
    k=3,
    num_candidates=10
)

for result in results:
    print(f"Score: {result['_score']}, Text: {result['_source']['text_field']}")

Hybrid Search (Dense + BM25)

from elasticsearch.helpers.vectorstore import DenseVectorStrategy

# Hybrid strategy combining dense vectors with BM25
hybrid_strategy = DenseVectorStrategy(
    distance=DistanceMetric.COSINE,
    model_id="sentence-transformers__all-minilm-l6-v2",
    hybrid=True  # Enable hybrid search
)

vector_store = VectorStore(
    client=client,
    index="hybrid_documents", 
    retrieval_strategy=hybrid_strategy,
    embedding_service=embedding_service,
    num_dimensions=384
)

# Search combines semantic similarity with keyword matching
results = vector_store.search(
    query="machine learning algorithms",
    k=5,
    num_candidates=20
)

Sparse Vector Search with ELSER

from elasticsearch.helpers.vectorstore import SparseVectorStrategy

# Sparse vector strategy using ELSER
sparse_strategy = SparseVectorStrategy(
    model_id=".elser_model_2"
)

sparse_vector_store = VectorStore(
    client=client,
    index="sparse_documents",
    retrieval_strategy=sparse_strategy
)

# Add documents (embeddings generated by ELSER in Elasticsearch)
sparse_vector_store.add_documents([
    {"text_field": "Natural language processing with BERT"},
    {"text_field": "Deep learning for computer vision"},
    {"text_field": "Reinforcement learning algorithms"}
])

# Search using sparse representations
results = sparse_vector_store.search(
    query="neural networks",
    k=3
)

Maximal Marginal Relevance Search

# Get diverse results using MMR
diverse_results = vector_store.max_marginal_relevance_search(
    query="artificial intelligence",
    k=5,           # Final number of results
    fetch_k=20,    # Initial candidates to consider
    lambda_mult=0.7  # Balance: 0.7 relevance, 0.3 diversity
)

# Results will be relevant but diverse
for result in diverse_results:
    print(f"Text: {result['_source']['text_field']}")

Custom Metadata and Filtering

# Define metadata schema
metadata_mappings = {
    "category": {"type": "keyword"},
    "timestamp": {"type": "date"},
    "author": {"type": "keyword"},
    "tags": {"type": "keyword"}
}

vector_store = VectorStore(
    client=client,
    index="documents_with_metadata",
    retrieval_strategy=strategy,
    embedding_service=embedding_service,
    metadata_mappings=metadata_mappings,
    num_dimensions=384
)

# Add documents with rich metadata
documents = [
    {
        "text_field": "Advanced machine learning techniques",
        "category": "ai",
        "author": "researcher",
        "tags": ["ml", "deep-learning"],
        "timestamp": "2024-01-15"
    }
]

vector_store.add_documents(documents)

# Search with filters
filtered_results = vector_store.search(
    query="machine learning",
    k=5,
    filter=[
        {"term": {"category": "ai"}},
        {"range": {"timestamp": {"gte": "2024-01-01"}}}
    ]
)

Async Vector Store

from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers.vectorstore import (
    AsyncVectorStore,
    AsyncElasticsearchEmbeddings
)

async def async_vector_search():
    # Setup async client and services
    async_client = AsyncElasticsearch(['http://localhost:9200'])
    
    async_embedding_service = AsyncElasticsearchEmbeddings(
        client=async_client,
        model_id="sentence-transformers__all-minilm-l6-v2"
    )
    
    async_vector_store = AsyncVectorStore(
        client=async_client,
        index="async_documents",
        retrieval_strategy=strategy,
        embedding_service=async_embedding_service,
        num_dimensions=384
    )
    
    # Async operations
    await async_vector_store.add_documents(documents)
    results = await async_vector_store.search(query="search query", k=5)
    
    await async_vector_store.close()
    await async_client.close()

# Run async function
import asyncio
asyncio.run(async_vector_search())

Advanced Use Cases

Custom Index Settings

# Custom index configuration for performance
custom_settings = {
    "number_of_shards": 2,
    "number_of_replicas": 1,
    "index": {
        "knn": True,
        "knn.algo_param.ef_construction": 200,
        "knn.algo_param.m": 16
    }
}

vector_store = VectorStore(
    client=client,
    index="high_performance_vectors",
    retrieval_strategy=strategy,
    embedding_service=embedding_service,
    custom_index_settings=custom_settings,
    num_dimensions=384
)

Multi-Strategy Comparison

# Compare different retrieval strategies
strategies = {
    "dense_cosine": DenseVectorStrategy(distance=DistanceMetric.COSINE),
    "dense_euclidean": DenseVectorStrategy(distance=DistanceMetric.EUCLIDEAN_DISTANCE),
    "sparse_elser": SparseVectorStrategy(model_id=".elser_model_2"),
    "bm25": BM25Strategy(),
    "hybrid": DenseVectorStrategy(hybrid=True)
}

results_comparison = {}
query = "machine learning applications"

for name, strategy in strategies.items():
    store = VectorStore(
        client=client,
        index=f"comparison_{name}",
        retrieval_strategy=strategy,
        embedding_service=embedding_service if strategy.needs_inference() else None
    )
    
    results_comparison[name] = store.search(query=query, k=5)

Types

from typing import Any, Dict, List, Optional, Tuple, Union
from enum import Enum

# Core types
Document = Dict[str, Any]
Vector = List[float]
VectorList = List[Vector]
SearchResult = Dict[str, Any]
SearchResults = List[SearchResult]

# Strategy types  
class DistanceMetric(str, Enum):
    COSINE = "COSINE"
    DOT_PRODUCT = "DOT_PRODUCT" 
    EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE"
    MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT"

# Filter types
FilterClause = Dict[str, Any]
FilterList = List[FilterClause]

# MMR types
MMRResult = List[int]  # Indices of selected documents

# Bulk operation results
BulkResult = List[str]  # List of document IDs

Install with Tessl CLI

npx tessl i tessl/pypi-elasticsearch

docs

client-operations.md

cluster-management.md

esql-operations.md

exception-handling.md

helper-functions.md

index-management.md

index.md

inference-api.md

lifecycle-management.md

machine-learning.md

query-dsl.md

search-operations.md

security-operations.md

vectorstore-helpers.md

tile.json