0
# Vectorstore Helpers
1
2
High-level abstraction for building vector search applications with Elasticsearch. The vectorstore helpers provide a unified interface for different retrieval strategies, embedding services, and vector similarity search patterns, supporting both dense and sparse vector approaches.
3
4
## Capabilities
5
6
### Vector Store
7
8
The main VectorStore class provides high-level document indexing and search capabilities with pluggable retrieval strategies.
9
10
```python { .api }
11
class VectorStore:
12
def __init__(
13
self,
14
client: Elasticsearch,
15
*,
16
index: str,
17
retrieval_strategy: RetrievalStrategy,
18
embedding_service: Optional[EmbeddingService] = None,
19
num_dimensions: Optional[int] = None,
20
text_field: str = "text_field",
21
vector_field: str = "vector_field",
22
metadata_mappings: Optional[Dict[str, Any]] = None,
23
user_agent: str = f"elasticsearch-py-vs/{version}",
24
custom_index_settings: Optional[Dict[str, Any]] = None,
25
):
26
"""
27
High-level vector store for document indexing and search.
28
29
Parameters:
30
- client: Elasticsearch client connection
31
- index: Index name for the vector store
32
- retrieval_strategy: Strategy for indexing and searching (see strategies)
33
- embedding_service: Service for generating embeddings (if needed)
34
- num_dimensions: Vector dimensions (for dense vector strategies)
35
- text_field: Field name for text content
36
- vector_field: Field name for vector embeddings
37
- metadata_mappings: Schema for document metadata
38
- user_agent: Custom user agent for tracking
39
- custom_index_settings: Additional index configuration
40
"""
41
42
def add_documents(
43
self,
44
documents: List[Dict[str, Any]],
45
vectors: Optional[List[List[float]]] = None,
46
ids: Optional[List[str]] = None,
47
refresh: bool = True,
48
create_index_if_not_exists: bool = True,
49
bulk_kwargs: Optional[Dict[str, Any]] = None,
50
) -> List[str]:
51
"""
52
Add documents to the vector store.
53
54
Parameters:
55
- documents: List of documents with text and metadata
56
- vectors: Pre-computed vectors (optional, depends on strategy)
57
- ids: Document IDs (auto-generated if not provided)
58
- refresh: Whether to refresh index after adding
59
- create_index_if_not_exists: Auto-create index if needed
60
- bulk_kwargs: Additional bulk indexing parameters
61
62
Returns:
63
List of document IDs that were added
64
"""
65
66
def search(
67
self,
68
query: Optional[str] = None,
69
*,
70
query_vector: Optional[List[float]] = None,
71
k: int = 4,
72
num_candidates: int = 50,
73
filter: Optional[List[Dict[str, Any]]] = None,
74
similarity_threshold: Optional[float] = None,
75
) -> List[Dict[str, Any]]:
76
"""
77
Search for similar documents.
78
79
Parameters:
80
- query: Text query string
81
- query_vector: Pre-computed query vector
82
- k: Number of results to return
83
- num_candidates: Number of candidates for kNN search
84
- filter: Filter conditions to apply
85
- similarity_threshold: Minimum similarity score
86
87
Returns:
88
List of search results with documents and scores
89
"""
90
91
def similarity_search_with_score(
92
self,
93
query: str,
94
*,
95
k: int = 4,
96
num_candidates: int = 50,
97
filter: Optional[List[Dict[str, Any]]] = None,
98
) -> List[Tuple[Dict[str, Any], float]]:
99
"""
100
Search with similarity scores included.
101
102
Parameters:
103
- query: Text query string
104
- k: Number of results to return
105
- num_candidates: Number of candidates for kNN search
106
- filter: Filter conditions to apply
107
108
Returns:
109
List of (document, score) tuples
110
"""
111
112
def max_marginal_relevance_search(
113
self,
114
query: str,
115
*,
116
k: int = 4,
117
fetch_k: int = 20,
118
lambda_mult: float = 0.5,
119
filter: Optional[List[Dict[str, Any]]] = None,
120
) -> List[Dict[str, Any]]:
121
"""
122
Maximal marginal relevance search for diverse results.
123
124
Parameters:
125
- query: Text query string
126
- k: Number of final results to return
127
- fetch_k: Number of initial candidates to fetch
128
- lambda_mult: Diversity parameter (0=max diversity, 1=max relevance)
129
- filter: Filter conditions to apply
130
131
Returns:
132
List of diverse search results
133
"""
134
135
def delete(self, ids: Optional[List[str]] = None) -> bool:
136
"""
137
Delete documents by IDs or delete entire index.
138
139
Parameters:
140
- ids: Document IDs to delete (if None, deletes entire index)
141
142
Returns:
143
True if deletion was successful
144
"""
145
146
def close(self) -> None:
147
"""Close the vector store and clean up resources."""
148
```
149
150
### Async Vector Store
151
152
Asynchronous version of VectorStore for high-performance applications.
153
154
```python { .api }
155
class AsyncVectorStore:
156
def __init__(
157
self,
158
client: AsyncElasticsearch,
159
*,
160
index: str,
161
retrieval_strategy: AsyncRetrievalStrategy,
162
embedding_service: Optional[AsyncEmbeddingService] = None,
163
num_dimensions: Optional[int] = None,
164
text_field: str = "text_field",
165
vector_field: str = "vector_field",
166
metadata_mappings: Optional[Dict[str, Any]] = None,
167
user_agent: str = f"elasticsearch-py-vs/{version}",
168
custom_index_settings: Optional[Dict[str, Any]] = None,
169
):
170
"""Async version of VectorStore with identical interface."""
171
172
async def add_documents(
173
self,
174
documents: List[Dict[str, Any]],
175
vectors: Optional[List[List[float]]] = None,
176
ids: Optional[List[str]] = None,
177
refresh: bool = True,
178
create_index_if_not_exists: bool = True,
179
bulk_kwargs: Optional[Dict[str, Any]] = None,
180
) -> List[str]:
181
"""Async version of add_documents."""
182
183
async def search(
184
self,
185
query: Optional[str] = None,
186
*,
187
query_vector: Optional[List[float]] = None,
188
k: int = 4,
189
num_candidates: int = 50,
190
filter: Optional[List[Dict[str, Any]]] = None,
191
similarity_threshold: Optional[float] = None,
192
) -> List[Dict[str, Any]]:
193
"""Async version of search."""
194
195
async def max_marginal_relevance_search(
196
self,
197
query: str,
198
*,
199
k: int = 4,
200
fetch_k: int = 20,
201
lambda_mult: float = 0.5,
202
filter: Optional[List[Dict[str, Any]]] = None,
203
) -> List[Dict[str, Any]]:
204
"""Async version of max_marginal_relevance_search."""
205
206
async def delete(self, ids: Optional[List[str]] = None) -> bool:
207
"""Async version of delete."""
208
209
async def close(self) -> None:
210
"""Async version of close."""
211
```
212
213
### Retrieval Strategies
214
215
Different strategies for vector indexing and search, each optimized for specific use cases.
216
217
```python { .api }
218
class RetrievalStrategy(ABC):
219
@abstractmethod
220
def es_query(
221
self,
222
*,
223
query: Optional[str],
224
query_vector: Optional[List[float]],
225
text_field: str,
226
vector_field: str,
227
k: int,
228
num_candidates: int,
229
filter: List[Dict[str, Any]] = [],
230
) -> Dict[str, Any]:
231
"""Generate Elasticsearch query for the given parameters."""
232
233
@abstractmethod
234
def es_mappings_settings(
235
self,
236
*,
237
text_field: str,
238
vector_field: str,
239
num_dimensions: Optional[int],
240
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
241
"""Generate index mappings and settings for this strategy."""
242
243
class DenseVectorStrategy(RetrievalStrategy):
244
"""Dense vector retrieval using kNN search with HNSW algorithm."""
245
246
def __init__(
247
self,
248
*,
249
distance: DistanceMetric = DistanceMetric.COSINE,
250
model_id: Optional[str] = None,
251
hybrid: bool = False,
252
):
253
"""
254
Dense vector strategy using cosine/euclidean/dot-product similarity.
255
256
Parameters:
257
- distance: Distance metric for similarity calculation
258
- model_id: Elasticsearch model ID for inference pipeline
259
- hybrid: Whether to combine with BM25 text search
260
"""
261
262
class SparseVectorStrategy(RetrievalStrategy):
263
"""Sparse vector retrieval using learned sparse encoders like ELSER."""
264
265
def __init__(self, *, model_id: str):
266
"""
267
Sparse vector strategy using learned sparse representations.
268
269
Parameters:
270
- model_id: Elasticsearch model ID for sparse vector generation
271
"""
272
273
class BM25Strategy(RetrievalStrategy):
274
"""Traditional BM25 full-text search strategy."""
275
276
def __init__(self, *, hybrid: bool = False):
277
"""
278
BM25 text search strategy.
279
280
Parameters:
281
- hybrid: Whether to combine with vector search
282
"""
283
284
class DenseVectorScriptScoreStrategy(RetrievalStrategy):
285
"""Dense vector search using script_score for custom scoring."""
286
287
def __init__(
288
self,
289
*,
290
distance: DistanceMetric = DistanceMetric.COSINE,
291
model_id: Optional[str] = None,
292
):
293
"""
294
Dense vector strategy using script_score for flexibility.
295
296
Parameters:
297
- distance: Distance metric for script_score calculation
298
- model_id: Elasticsearch model ID for inference pipeline
299
"""
300
```
301
302
### Embedding Services
303
304
Services for generating vector embeddings from text, supporting both local and remote models.
305
306
```python { .api }
307
class EmbeddingService(ABC):
308
@abstractmethod
309
def embed_documents(self, texts: List[str]) -> List[List[float]]:
310
"""Generate embeddings for multiple documents."""
311
312
@abstractmethod
313
def embed_query(self, query: str) -> List[float]:
314
"""Generate embedding for a single query."""
315
316
class ElasticsearchEmbeddings(EmbeddingService):
317
"""Use Elasticsearch deployed models for embedding generation."""
318
319
def __init__(
320
self,
321
*,
322
client: Elasticsearch,
323
model_id: str,
324
input_field: str = "text_field",
325
user_agent: str = f"elasticsearch-py-es/{version}",
326
):
327
"""
328
Elasticsearch-based embedding service.
329
330
Parameters:
331
- client: Elasticsearch client
332
- model_id: Deployed model ID in Elasticsearch
333
- input_field: Input field name for the model
334
- user_agent: Custom user agent for tracking
335
"""
336
337
def embed_documents(self, texts: List[str]) -> List[List[float]]:
338
"""Generate embeddings using Elasticsearch inference API."""
339
340
def embed_query(self, query: str) -> List[float]:
341
"""Generate query embedding using Elasticsearch inference API."""
342
343
class AsyncElasticsearchEmbeddings(AsyncEmbeddingService):
344
"""Async version of ElasticsearchEmbeddings."""
345
346
def __init__(
347
self,
348
*,
349
client: AsyncElasticsearch,
350
model_id: str,
351
input_field: str = "text_field",
352
user_agent: str = f"elasticsearch-py-es/{version}",
353
):
354
"""Async Elasticsearch embedding service."""
355
356
async def embed_documents(self, texts: List[str]) -> List[List[float]]:
357
"""Async generate embeddings for documents."""
358
359
async def embed_query(self, query: str) -> List[float]:
360
"""Async generate query embedding."""
361
```
362
363
### Distance Metrics and Utilities
364
365
Vector similarity calculations and maximal marginal relevance for diverse results.
366
367
```python { .api }
368
class DistanceMetric(str, Enum):
369
"""Elasticsearch dense vector distance metrics."""
370
371
COSINE = "COSINE" # Cosine similarity
372
DOT_PRODUCT = "DOT_PRODUCT" # Dot product similarity
373
EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE" # L2 distance
374
MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT" # Maximum inner product
375
376
def maximal_marginal_relevance(
377
query_embedding: List[float],
378
embedding_list: List[List[float]],
379
lambda_mult: float = 0.5,
380
k: int = 4,
381
) -> List[int]:
382
"""
383
Calculate maximal marginal relevance for diverse search results.
384
385
Parameters:
386
- query_embedding: Query vector
387
- embedding_list: Candidate document vectors
388
- lambda_mult: Balance between relevance (1.0) and diversity (0.0)
389
- k: Number of results to select
390
391
Returns:
392
List of indices for diverse, relevant results
393
"""
394
```
395
396
## Usage Examples
397
398
### Basic Dense Vector Search
399
400
```python
401
from elasticsearch import Elasticsearch
402
from elasticsearch.helpers.vectorstore import (
403
VectorStore,
404
DenseVectorStrategy,
405
ElasticsearchEmbeddings,
406
DistanceMetric
407
)
408
409
# Setup
410
client = Elasticsearch(['http://localhost:9200'])
411
412
# Configure embedding service
413
embedding_service = ElasticsearchEmbeddings(
414
client=client,
415
model_id="sentence-transformers__all-minilm-l6-v2"
416
)
417
418
# Configure dense vector strategy
419
strategy = DenseVectorStrategy(
420
distance=DistanceMetric.COSINE,
421
model_id="sentence-transformers__all-minilm-l6-v2"
422
)
423
424
# Create vector store
425
vector_store = VectorStore(
426
client=client,
427
index="documents",
428
retrieval_strategy=strategy,
429
embedding_service=embedding_service,
430
num_dimensions=384
431
)
432
433
# Add documents
434
documents = [
435
{"text_field": "Elasticsearch is a search engine", "metadata": {"category": "tech"}},
436
{"text_field": "Python is a programming language", "metadata": {"category": "programming"}},
437
{"text_field": "Machine learning with transformers", "metadata": {"category": "ai"}}
438
]
439
440
ids = vector_store.add_documents(documents)
441
442
# Search
443
results = vector_store.search(
444
query="search technology",
445
k=3,
446
num_candidates=10
447
)
448
449
for result in results:
450
print(f"Score: {result['_score']}, Text: {result['_source']['text_field']}")
451
```
452
453
### Hybrid Search (Dense + BM25)
454
455
```python
456
from elasticsearch.helpers.vectorstore import DenseVectorStrategy
457
458
# Hybrid strategy combining dense vectors with BM25
459
hybrid_strategy = DenseVectorStrategy(
460
distance=DistanceMetric.COSINE,
461
model_id="sentence-transformers__all-minilm-l6-v2",
462
hybrid=True # Enable hybrid search
463
)
464
465
vector_store = VectorStore(
466
client=client,
467
index="hybrid_documents",
468
retrieval_strategy=hybrid_strategy,
469
embedding_service=embedding_service,
470
num_dimensions=384
471
)
472
473
# Search combines semantic similarity with keyword matching
474
results = vector_store.search(
475
query="machine learning algorithms",
476
k=5,
477
num_candidates=20
478
)
479
```
480
481
### Sparse Vector Search with ELSER
482
483
```python
484
from elasticsearch.helpers.vectorstore import SparseVectorStrategy
485
486
# Sparse vector strategy using ELSER
487
sparse_strategy = SparseVectorStrategy(
488
model_id=".elser_model_2"
489
)
490
491
sparse_vector_store = VectorStore(
492
client=client,
493
index="sparse_documents",
494
retrieval_strategy=sparse_strategy
495
)
496
497
# Add documents (embeddings generated by ELSER in Elasticsearch)
498
sparse_vector_store.add_documents([
499
{"text_field": "Natural language processing with BERT"},
500
{"text_field": "Deep learning for computer vision"},
501
{"text_field": "Reinforcement learning algorithms"}
502
])
503
504
# Search using sparse representations
505
results = sparse_vector_store.search(
506
query="neural networks",
507
k=3
508
)
509
```
510
511
### Maximal Marginal Relevance Search
512
513
```python
514
# Get diverse results using MMR
515
diverse_results = vector_store.max_marginal_relevance_search(
516
query="artificial intelligence",
517
k=5, # Final number of results
518
fetch_k=20, # Initial candidates to consider
519
lambda_mult=0.7 # Balance: 0.7 relevance, 0.3 diversity
520
)
521
522
# Results will be relevant but diverse
523
for result in diverse_results:
524
print(f"Text: {result['_source']['text_field']}")
525
```
526
527
### Custom Metadata and Filtering
528
529
```python
530
# Define metadata schema
531
metadata_mappings = {
532
"category": {"type": "keyword"},
533
"timestamp": {"type": "date"},
534
"author": {"type": "keyword"},
535
"tags": {"type": "keyword"}
536
}
537
538
vector_store = VectorStore(
539
client=client,
540
index="documents_with_metadata",
541
retrieval_strategy=strategy,
542
embedding_service=embedding_service,
543
metadata_mappings=metadata_mappings,
544
num_dimensions=384
545
)
546
547
# Add documents with rich metadata
548
documents = [
549
{
550
"text_field": "Advanced machine learning techniques",
551
"category": "ai",
552
"author": "researcher",
553
"tags": ["ml", "deep-learning"],
554
"timestamp": "2024-01-15"
555
}
556
]
557
558
vector_store.add_documents(documents)
559
560
# Search with filters
561
filtered_results = vector_store.search(
562
query="machine learning",
563
k=5,
564
filter=[
565
{"term": {"category": "ai"}},
566
{"range": {"timestamp": {"gte": "2024-01-01"}}}
567
]
568
)
569
```
570
571
### Async Vector Store
572
573
```python
574
from elasticsearch import AsyncElasticsearch
575
from elasticsearch.helpers.vectorstore import (
576
AsyncVectorStore,
577
AsyncElasticsearchEmbeddings
578
)
579
580
async def async_vector_search():
581
# Setup async client and services
582
async_client = AsyncElasticsearch(['http://localhost:9200'])
583
584
async_embedding_service = AsyncElasticsearchEmbeddings(
585
client=async_client,
586
model_id="sentence-transformers__all-minilm-l6-v2"
587
)
588
589
async_vector_store = AsyncVectorStore(
590
client=async_client,
591
index="async_documents",
592
retrieval_strategy=strategy,
593
embedding_service=async_embedding_service,
594
num_dimensions=384
595
)
596
597
# Async operations
598
await async_vector_store.add_documents(documents)
599
results = await async_vector_store.search(query="search query", k=5)
600
601
await async_vector_store.close()
602
await async_client.close()
603
604
# Run async function
605
import asyncio
606
asyncio.run(async_vector_search())
607
```
608
609
## Advanced Use Cases
610
611
### Custom Index Settings
612
613
```python
614
# Custom index configuration for performance
615
custom_settings = {
616
"number_of_shards": 2,
617
"number_of_replicas": 1,
618
"index": {
619
"knn": True,
620
"knn.algo_param.ef_construction": 200,
621
"knn.algo_param.m": 16
622
}
623
}
624
625
vector_store = VectorStore(
626
client=client,
627
index="high_performance_vectors",
628
retrieval_strategy=strategy,
629
embedding_service=embedding_service,
630
custom_index_settings=custom_settings,
631
num_dimensions=384
632
)
633
```
634
635
### Multi-Strategy Comparison
636
637
```python
638
# Compare different retrieval strategies
639
strategies = {
640
"dense_cosine": DenseVectorStrategy(distance=DistanceMetric.COSINE),
641
"dense_euclidean": DenseVectorStrategy(distance=DistanceMetric.EUCLIDEAN_DISTANCE),
642
"sparse_elser": SparseVectorStrategy(model_id=".elser_model_2"),
643
"bm25": BM25Strategy(),
644
"hybrid": DenseVectorStrategy(hybrid=True)
645
}
646
647
results_comparison = {}
648
query = "machine learning applications"
649
650
for name, strategy in strategies.items():
651
store = VectorStore(
652
client=client,
653
index=f"comparison_{name}",
654
retrieval_strategy=strategy,
655
embedding_service=embedding_service if strategy.needs_inference() else None
656
)
657
658
results_comparison[name] = store.search(query=query, k=5)
659
```
660
661
## Types
662
663
```python { .api }
664
from typing import Any, Dict, List, Optional, Tuple, Union
665
from enum import Enum
666
667
# Core types
668
Document = Dict[str, Any]
669
Vector = List[float]
670
VectorList = List[Vector]
671
SearchResult = Dict[str, Any]
672
SearchResults = List[SearchResult]
673
674
# Strategy types
675
class DistanceMetric(str, Enum):
676
COSINE = "COSINE"
677
DOT_PRODUCT = "DOT_PRODUCT"
678
EUCLIDEAN_DISTANCE = "EUCLIDEAN_DISTANCE"
679
MAX_INNER_PRODUCT = "MAX_INNER_PRODUCT"
680
681
# Filter types
682
FilterClause = Dict[str, Any]
683
FilterList = List[FilterClause]
684
685
# MMR types
686
MMRResult = List[int] # Indices of selected documents
687
688
# Bulk operation results
689
BulkResult = List[str] # List of document IDs
690
```