Python client for Elasticsearch with comprehensive API coverage and both sync and async support
—
High-level abstraction for building vector search applications with Elasticsearch. The vectorstore helpers provide a unified interface for different retrieval strategies, embedding services, and vector similarity search patterns, supporting both dense and sparse vector approaches.
The main VectorStore class provides high-level document indexing and search capabilities with pluggable retrieval strategies.
class VectorStore:
def __init__(
self,
client: Elasticsearch,
*,
index: str,
retrieval_strategy: RetrievalStrategy,
embedding_service: Optional[EmbeddingService] = None,
num_dimensions: Optional[int] = None,
text_field: str = "text_field",
vector_field: str = "vector_field",
metadata_mappings: Optional[Dict[str, Any]] = None,
user_agent: str = f"elasticsearch-py-vs/{version}",
custom_index_settings: Optional[Dict[str, Any]] = None,
):
"""
High-level vector store for document indexing and search.
Parameters:
- client: Elasticsearch client connection
- index: Index name for the vector store
- retrieval_strategy: Strategy for indexing and searching (see strategies)
- embedding_service: Service for generating embeddings (if needed)
- num_dimensions: Vector dimensions (for dense vector strategies)
- text_field: Field name for text content
- vector_field: Field name for vector embeddings
- metadata_mappings: Schema for document metadata
- user_agent: Custom user agent for tracking
- custom_index_settings: Additional index configuration
"""
def add_documents(
self,
documents: List[Dict[str, Any]],
vectors: Optional[List[List[float]]] = None,
ids: Optional[List[str]] = None,
refresh: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict[str, Any]] = None,
) -> List[str]:
"""
Add documents to the vector store.
Parameters:
- documents: List of documents with text and metadata
- vectors: Pre-computed vectors (optional, depends on strategy)
- ids: Document IDs (auto-generated if not provided)
- refresh: Whether to refresh index after adding
- create_index_if_not_exists: Auto-create index if needed
- bulk_kwargs: Additional bulk indexing parameters
Returns:
List of document IDs that were added
"""
def search(
self,
query: Optional[str] = None,
*,
query_vector: Optional[List[float]] = None,
k: int = 4,
num_candidates: int = 50,
filter: Optional[List[Dict[str, Any]]] = None,
similarity_threshold: Optional[float] = None,
) -> List[Dict[str, Any]]:
"""
Search for similar documents.
Parameters:
- query: Text query string
- query_vector: Pre-computed query vector
- k: Number of results to return
- num_candidates: Number of candidates for kNN search
- filter: Filter conditions to apply
- similarity_threshold: Minimum similarity score
Returns:
List of search results with documents and scores
"""
def similarity_search_with_score(
self,
query: str,
*,
k: int = 4,
num_candidates: int = 50,
filter: Optional[List[Dict[str, Any]]] = None,
) -> List[Tuple[Dict[str, Any], float]]:
"""
Search with similarity scores included.
Parameters:
- query: Text query string
- k: Number of results to return
- num_candidates: Number of candidates for kNN search
- filter: Filter conditions to apply
Returns:
List of (document, score) tuples
"""
def max_marginal_relevance_search(
self,
query: str,
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""
Maximal marginal relevance search for diverse results.
Parameters:
- query: Text query string
- k: Number of final results to return
- fetch_k: Number of initial candidates to fetch
- lambda_mult: Diversity parameter (0=max diversity, 1=max relevance)
- filter: Filter conditions to apply
Returns:
List of diverse search results
"""
def delete(self, ids: Optional[List[str]] = None) -> bool:
"""
Delete documents by IDs or delete entire index.
Parameters:
- ids: Document IDs to delete (if None, deletes entire index)
Returns:
True if deletion was successful
"""
def close(self) -> None:
"""Close the vector store and clean up resources."""Asynchronous version of VectorStore for high-performance applications.
class AsyncVectorStore:
def __init__(
self,
client: AsyncElasticsearch,
*,
index: str,
retrieval_strategy: AsyncRetrievalStrategy,
embedding_service: Optional[AsyncEmbeddingService] = None,
num_dimensions: Optional[int] = None,
text_field: str = "text_field",
vector_field: str = "vector_field",
metadata_mappings: Optional[Dict[str, Any]] = None,
user_agent: str = f"elasticsearch-py-vs/{version}",
custom_index_settings: Optional[Dict[str, Any]] = None,
):
"""Async version of VectorStore with identical interface."""
async def add_documents(
self,
documents: List[Dict[str, Any]],
vectors: Optional[List[List[float]]] = None,
ids: Optional[List[str]] = None,
refresh: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict[str, Any]] = None,
) -> List[str]:
"""Async version of add_documents."""
async def search(
self,
query: Optional[str] = None,
*,
query_vector: Optional[List[float]] = None,
k: int = 4,
num_candidates: int = 50,
filter: Optional[List[Dict[str, Any]]] = None,
similarity_threshold: Optional[float] = None,
) -> List[Dict[str, Any]]:
"""Async version of search."""
async def max_marginal_relevance_search(
self,
query: str,
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""Async version of max_marginal_relevance_search."""
async def delete(self, ids: Optional[List[str]] = None) -> bool:
"""Async version of delete."""
async def close(self) -> None:
"""Async version of close."""Different strategies for vector indexing and search, each optimized for specific use cases.
class RetrievalStrategy(ABC):
@abstractmethod
def es_query(
self,
*,
query: Optional[str],
query_vector: Optional[List[float]],
text_field: str,
vector_field: str,
k: int,
num_candidates: int,
filter: List[Dict[str, Any]] = [],
) -> Dict[str, Any]:
"""Generate Elasticsearch query for the given parameters."""
@abstractmethod
def es_mappings_settings(
self,
*,
text_field: str,
vector_field: str,
num_dimensions: Optional[int],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Generate index mappings and settings for this strategy."""
class DenseVectorStrategy(RetrievalStrategy):
"""Dense vector retrieval using kNN search with HNSW algorithm."""
def __init__(
self,
*,
distance: DistanceMetric = DistanceMetric.COSINE,
model_id: Optional[str] = None,
hybrid: bool = False,
):
"""
Dense vector strategy using cosine/euclidean/dot-product similarity.
Parameters:
- distance: Distance metric for similarity calculation
- model_id: Elasticsearch model ID for inference pipeline
- hybrid: Whether to combine with BM25 text search
"""
class SparseVectorStrategy(RetrievalStrategy):
"""Sparse vector retrieval using learned sparse encoders like ELSER."""
def __init__(self, *, model_id: str):
"""
Sparse vector strategy using learned sparse representations.
Parameters:
- model_id: Elasticsearch model ID for sparse vector generation
"""
class BM25Strategy(RetrievalStrategy):
"""Traditional BM25 full-text search strategy."""
def __init__(self, *, hybrid: bool = False):
"""
BM25 text search strategy.
Parameters:
- hybrid: Whether to combine with vector search
"""
class DenseVectorScriptScoreStrategy(RetrievalStrategy):
"""Dense vector search using script_score for custom scoring."""
def __init__(
self,
*,
distance: DistanceMetric = DistanceMetric.COSINE,
model_id: Optional[str] = None,
):
"""
Dense vector strategy using script_score for flexibility.
Parameters:
- distance: Distance metric for script_score calculation
- model_id: Elasticsearch model ID for inference pipeline
"""Services for generating vector embeddings from text, supporting both local and remote models.
class EmbeddingService(ABC):
@abstractmethod
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple documents."""
@abstractmethod
def embed_query(self, query: str) -> List[float]:
"""Generate embedding for a single query."""
class ElasticsearchEmbeddings(EmbeddingService):
"""Use Elasticsearch deployed models for embedding generation."""
def __init__(
self,
*,
client: Elasticsearch,
model_id: str,
input_field: str = "text_field",
user_agent: str = f"elasticsearch-py-es/{version}",
):
"""
Elasticsearch-based embedding service.
Parameters:
- client: Elasticsearch client
- model_id: Deployed model ID in Elasticsearch
- input_field: Input field name for the model
- user_agent: Custom user agent for tracking
"""
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using Elasticsearch inference API."""
def embed_query(self, query: str) -> List[float]:
"""Generate query embedding using Elasticsearch inference API."""
class AsyncElasticsearchEmbeddings(AsyncEmbeddingService):
"""Async version of ElasticsearchEmbeddings."""
def __init__(
self,
*,
client: AsyncElasticsearch,
model_id: str,
input_field: str = "text_field",
user_agent: str = f"elasticsearch-py-es/{version}",
):
"""Async Elasticsearch embedding service."""
async def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Async generate embeddings for documents."""
async def embed_query(self, query: str) -> List[float]:
"""Async generate query embedding."""Vector similarity calculations and maximal marginal relevance for diverse results.
class DistanceMetric(str, Enum):
"""Elasticsearch dense vector distance metrics."""
COSINE = "COSINE" # Cosine similarity
DOT_PRODUCT = "DOT_PRODUCT" # Dot product similarity
EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE" # L2 distance
MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT" # Maximum inner product
def maximal_marginal_relevance(
query_embedding: List[float],
embedding_list: List[List[float]],
lambda_mult: float = 0.5,
k: int = 4,
) -> List[int]:
"""
Calculate maximal marginal relevance for diverse search results.
Parameters:
- query_embedding: Query vector
- embedding_list: Candidate document vectors
- lambda_mult: Balance between relevance (1.0) and diversity (0.0)
- k: Number of results to select
Returns:
List of indices for diverse, relevant results
"""from elasticsearch import Elasticsearch
from elasticsearch.helpers.vectorstore import (
VectorStore,
DenseVectorStrategy,
ElasticsearchEmbeddings,
DistanceMetric
)
# Setup
client = Elasticsearch(['http://localhost:9200'])
# Configure embedding service
embedding_service = ElasticsearchEmbeddings(
client=client,
model_id="sentence-transformers__all-minilm-l6-v2"
)
# Configure dense vector strategy
strategy = DenseVectorStrategy(
distance=DistanceMetric.COSINE,
model_id="sentence-transformers__all-minilm-l6-v2"
)
# Create vector store
vector_store = VectorStore(
client=client,
index="documents",
retrieval_strategy=strategy,
embedding_service=embedding_service,
num_dimensions=384
)
# Add documents
documents = [
{"text_field": "Elasticsearch is a search engine", "metadata": {"category": "tech"}},
{"text_field": "Python is a programming language", "metadata": {"category": "programming"}},
{"text_field": "Machine learning with transformers", "metadata": {"category": "ai"}}
]
ids = vector_store.add_documents(documents)
# Search
results = vector_store.search(
query="search technology",
k=3,
num_candidates=10
)
for result in results:
print(f"Score: {result['_score']}, Text: {result['_source']['text_field']}")from elasticsearch.helpers.vectorstore import DenseVectorStrategy
# Hybrid strategy combining dense vectors with BM25
hybrid_strategy = DenseVectorStrategy(
distance=DistanceMetric.COSINE,
model_id="sentence-transformers__all-minilm-l6-v2",
hybrid=True # Enable hybrid search
)
vector_store = VectorStore(
client=client,
index="hybrid_documents",
retrieval_strategy=hybrid_strategy,
embedding_service=embedding_service,
num_dimensions=384
)
# Search combines semantic similarity with keyword matching
results = vector_store.search(
query="machine learning algorithms",
k=5,
num_candidates=20
)from elasticsearch.helpers.vectorstore import SparseVectorStrategy
# Sparse vector strategy using ELSER
sparse_strategy = SparseVectorStrategy(
model_id=".elser_model_2"
)
sparse_vector_store = VectorStore(
client=client,
index="sparse_documents",
retrieval_strategy=sparse_strategy
)
# Add documents (embeddings generated by ELSER in Elasticsearch)
sparse_vector_store.add_documents([
{"text_field": "Natural language processing with BERT"},
{"text_field": "Deep learning for computer vision"},
{"text_field": "Reinforcement learning algorithms"}
])
# Search using sparse representations
results = sparse_vector_store.search(
query="neural networks",
k=3
)# Get diverse results using MMR
diverse_results = vector_store.max_marginal_relevance_search(
query="artificial intelligence",
k=5, # Final number of results
fetch_k=20, # Initial candidates to consider
lambda_mult=0.7 # Balance: 0.7 relevance, 0.3 diversity
)
# Results will be relevant but diverse
for result in diverse_results:
print(f"Text: {result['_source']['text_field']}")# Define metadata schema
metadata_mappings = {
"category": {"type": "keyword"},
"timestamp": {"type": "date"},
"author": {"type": "keyword"},
"tags": {"type": "keyword"}
}
vector_store = VectorStore(
client=client,
index="documents_with_metadata",
retrieval_strategy=strategy,
embedding_service=embedding_service,
metadata_mappings=metadata_mappings,
num_dimensions=384
)
# Add documents with rich metadata
documents = [
{
"text_field": "Advanced machine learning techniques",
"category": "ai",
"author": "researcher",
"tags": ["ml", "deep-learning"],
"timestamp": "2024-01-15"
}
]
vector_store.add_documents(documents)
# Search with filters
filtered_results = vector_store.search(
query="machine learning",
k=5,
filter=[
{"term": {"category": "ai"}},
{"range": {"timestamp": {"gte": "2024-01-01"}}}
]
)from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers.vectorstore import (
AsyncVectorStore,
AsyncElasticsearchEmbeddings
)
async def async_vector_search():
# Setup async client and services
async_client = AsyncElasticsearch(['http://localhost:9200'])
async_embedding_service = AsyncElasticsearchEmbeddings(
client=async_client,
model_id="sentence-transformers__all-minilm-l6-v2"
)
async_vector_store = AsyncVectorStore(
client=async_client,
index="async_documents",
retrieval_strategy=strategy,
embedding_service=async_embedding_service,
num_dimensions=384
)
# Async operations
await async_vector_store.add_documents(documents)
results = await async_vector_store.search(query="search query", k=5)
await async_vector_store.close()
await async_client.close()
# Run async function
import asyncio
asyncio.run(async_vector_search())# Custom index configuration for performance
custom_settings = {
"number_of_shards": 2,
"number_of_replicas": 1,
"index": {
"knn": True,
"knn.algo_param.ef_construction": 200,
"knn.algo_param.m": 16
}
}
vector_store = VectorStore(
client=client,
index="high_performance_vectors",
retrieval_strategy=strategy,
embedding_service=embedding_service,
custom_index_settings=custom_settings,
num_dimensions=384
)# Compare different retrieval strategies
strategies = {
"dense_cosine": DenseVectorStrategy(distance=DistanceMetric.COSINE),
"dense_euclidean": DenseVectorStrategy(distance=DistanceMetric.EUCLIDEAN_DISTANCE),
"sparse_elser": SparseVectorStrategy(model_id=".elser_model_2"),
"bm25": BM25Strategy(),
"hybrid": DenseVectorStrategy(hybrid=True)
}
results_comparison = {}
query = "machine learning applications"
for name, strategy in strategies.items():
store = VectorStore(
client=client,
index=f"comparison_{name}",
retrieval_strategy=strategy,
embedding_service=embedding_service if strategy.needs_inference() else None
)
results_comparison[name] = store.search(query=query, k=5)from typing import Any, Dict, List, Optional, Tuple, Union
from enum import Enum
# Core types
Document = Dict[str, Any]
Vector = List[float]
VectorList = List[Vector]
SearchResult = Dict[str, Any]
SearchResults = List[SearchResult]
# Strategy types
class DistanceMetric(str, Enum):
COSINE = "COSINE"
DOT_PRODUCT = "DOT_PRODUCT"
EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE"
MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT"
# Filter types
FilterClause = Dict[str, Any]
FilterList = List[FilterClause]
# MMR types
MMRResult = List[int] # Indices of selected documents
# Bulk operation results
BulkResult = List[str] # List of document IDsInstall with Tessl CLI
npx tessl i tessl/pypi-elasticsearch