0
# Document Stores
1
2
Storage backends for documents and embeddings with filtering, search capabilities, and data persistence. Haystack provides document store implementations that serve as the foundation for retrieval and search operations.
3
4
## Capabilities
5
6
### In-Memory Document Store
7
8
Fast, memory-based document storage for development and small-scale applications.
9
10
```python { .api }
11
class InMemoryDocumentStore:
12
def __init__(
13
self,
14
bm25_tokenization_regex: str = r"(?u)\b\w\w+\b",
15
bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25Okapi",
16
bm25_parameters: Optional[Dict[str, Any]] = None,
17
embedding_similarity_function: Literal["cosine", "dot_product", "euclidean"] = "cosine"
18
) -> None:
19
"""
20
Initialize in-memory document store.
21
22
Args:
23
bm25_tokenization_regex: Regex pattern for BM25 tokenization
24
bm25_algorithm: BM25 algorithm variant to use
25
bm25_parameters: Parameters for BM25 algorithm (k1, b, epsilon, delta)
26
embedding_similarity_function: Similarity function for embedding search
27
"""
28
29
def write_documents(
30
self,
31
documents: List[Document],
32
policy: DuplicatePolicy = DuplicatePolicy.NONE
33
) -> int:
34
"""
35
Write documents to the store.
36
37
Args:
38
documents: List of Document objects to store
39
policy: How to handle duplicate documents
40
41
Returns:
42
Number of documents written
43
"""
44
45
def filter_documents(
46
self,
47
filters: Optional[Dict[str, Any]] = None
48
) -> List[Document]:
49
"""
50
Filter documents based on metadata criteria.
51
52
Args:
53
filters: Dictionary of filter conditions
54
55
Returns:
56
List of documents matching the filters
57
"""
58
59
def count_documents(self) -> int:
60
"""
61
Count total number of documents in the store.
62
63
Returns:
64
Total document count
65
"""
66
67
def delete_documents(
68
self,
69
document_ids: List[str]
70
) -> None:
71
"""
72
Delete documents by their IDs.
73
74
Args:
75
document_ids: List of document IDs to delete
76
"""
77
78
def get_documents_by_id(
79
self,
80
document_ids: List[str]
81
) -> List[Document]:
82
"""
83
Retrieve documents by their IDs.
84
85
Args:
86
document_ids: List of document IDs to retrieve
87
88
Returns:
89
List of retrieved documents
90
"""
91
92
def get_all_documents(self) -> List[Document]:
93
"""
94
Retrieve all documents from the store.
95
96
Returns:
97
List of all documents
98
"""
99
100
def get_embedding_count(self) -> int:
101
"""
102
Count documents with embeddings.
103
104
Returns:
105
Number of documents containing embeddings
106
"""
107
```
108
109
### Document Store Protocol
110
111
Interface definition for all document store implementations.
112
113
```python { .api }
114
class DocumentStore(Protocol):
115
"""Protocol defining the interface for document stores."""
116
117
def write_documents(
118
self,
119
documents: List[Document],
120
policy: DuplicatePolicy = DuplicatePolicy.NONE
121
) -> int:
122
"""Write documents to the store."""
123
124
def filter_documents(
125
self,
126
filters: Optional[Dict[str, Any]] = None
127
) -> List[Document]:
128
"""Filter documents based on metadata."""
129
130
def count_documents(self) -> int:
131
"""Count total documents."""
132
133
def delete_documents(self, document_ids: List[str]) -> None:
134
"""Delete documents by ID."""
135
```
136
137
### Duplicate Handling Policies
138
139
Control how duplicate documents are handled during writing operations.
140
141
```python { .api }
142
class DuplicatePolicy(Enum):
143
"""Policies for handling duplicate documents."""
144
145
NONE = "none" # Raise error on duplicates
146
SKIP = "skip" # Skip duplicate documents
147
OVERWRITE = "overwrite" # Replace existing documents
148
FAIL = "fail" # Fail the entire operation
149
```
150
151
### Filter Policies
152
153
Define how document filtering should be applied across different metadata types.
154
155
```python { .api }
156
class FilterPolicy:
157
def __init__(
158
self,
159
conditions: List[str] = None,
160
on_invalid_filter: Literal["raise", "ignore", "remove"] = "raise"
161
) -> None:
162
"""
163
Initialize filter policy.
164
165
Args:
166
conditions: List of allowed filter conditions
167
on_invalid_filter: Action to take on invalid filters
168
"""
169
170
def apply_filter_policy(
171
filters: Dict[str, Any],
172
policy: FilterPolicy = None
173
) -> Dict[str, Any]:
174
"""
175
Apply filter policy to a set of filters.
176
177
Args:
178
filters: Filter dictionary to validate
179
policy: Filter policy to apply
180
181
Returns:
182
Validated and processed filters
183
"""
184
```
185
186
## Usage Examples
187
188
### Basic Document Store Operations
189
190
```python
191
from haystack.document_stores.in_memory import InMemoryDocumentStore
192
from haystack import Document
193
194
# Initialize document store
195
document_store = InMemoryDocumentStore()
196
197
# Create sample documents
198
documents = [
199
Document(
200
content="Python is a high-level programming language.",
201
meta={"category": "programming", "language": "en", "difficulty": "beginner"}
202
),
203
Document(
204
content="Machine learning is a subset of artificial intelligence.",
205
meta={"category": "ai", "language": "en", "difficulty": "intermediate"}
206
),
207
Document(
208
content="Neural networks are inspired by biological neurons.",
209
meta={"category": "ai", "language": "en", "difficulty": "advanced"}
210
)
211
]
212
213
# Write documents to store
214
written_count = document_store.write_documents(documents)
215
print(f"Written {written_count} documents")
216
217
# Count total documents
218
total_docs = document_store.count_documents()
219
print(f"Total documents: {total_docs}")
220
221
# Get all documents
222
all_docs = document_store.get_all_documents()
223
for doc in all_docs:
224
print(f"ID: {doc.id} - Content: {doc.content[:50]}...")
225
```
226
227
### Document Filtering
228
229
```python
230
# Filter by single criteria
231
programming_docs = document_store.filter_documents(
232
filters={"category": "programming"}
233
)
234
print(f"Programming documents: {len(programming_docs)}")
235
236
# Filter by multiple criteria
237
ai_beginner_docs = document_store.filter_documents(
238
filters={"category": "ai", "difficulty": "beginner"}
239
)
240
241
# Advanced filtering with operators
242
advanced_filters = {
243
"difficulty": {"$in": ["intermediate", "advanced"]},
244
"category": {"$ne": "programming"}
245
}
246
filtered_docs = document_store.filter_documents(filters=advanced_filters)
247
248
# Range filtering for numeric metadata
249
numeric_docs = [
250
Document(content="Document 1", meta={"score": 85, "year": 2023}),
251
Document(content="Document 2", meta={"score": 92, "year": 2022}),
252
Document(content="Document 3", meta={"score": 78, "year": 2024})
253
]
254
255
document_store.write_documents(numeric_docs)
256
257
# Filter by score range
258
high_score_docs = document_store.filter_documents(
259
filters={"score": {"$gte": 80}}
260
)
261
262
# Filter by year range
263
recent_docs = document_store.filter_documents(
264
filters={"year": {"$gte": 2023, "$lte": 2024}}
265
)
266
```
267
268
### Duplicate Handling
269
270
```python
271
from haystack.document_stores.types import DuplicatePolicy
272
273
# Create documents with same ID
274
doc1 = Document(content="Original content", id="doc_123")
275
doc2 = Document(content="Updated content", id="doc_123")
276
277
# Skip duplicates
278
document_store.write_documents([doc1], policy=DuplicatePolicy.NONE)
279
written_count = document_store.write_documents([doc2], policy=DuplicatePolicy.SKIP)
280
print(f"Skipped duplicates, written: {written_count}") # Should be 0
281
282
# Overwrite duplicates
283
written_count = document_store.write_documents([doc2], policy=DuplicatePolicy.OVERWRITE)
284
print(f"Overwritten duplicates, written: {written_count}") # Should be 1
285
286
# Check the updated content
287
retrieved_doc = document_store.get_documents_by_id(["doc_123"])[0]
288
print(f"Updated content: {retrieved_doc.content}") # "Updated content"
289
```
290
291
### Working with Embeddings
292
293
```python
294
from haystack.components.embedders import OpenAIDocumentEmbedder
295
296
# Create documents with embeddings
297
embedder = OpenAIDocumentEmbedder()
298
docs_to_embed = [
299
Document(content="Vector databases store high-dimensional data."),
300
Document(content="Similarity search finds related documents."),
301
Document(content="Embeddings capture semantic meaning.")
302
]
303
304
# Generate embeddings
305
embedding_result = embedder.run(documents=docs_to_embed)
306
embedded_docs = embedding_result["documents"]
307
308
# Store documents with embeddings
309
document_store.write_documents(embedded_docs)
310
311
# Check embedding count
312
embedding_count = document_store.get_embedding_count()
313
print(f"Documents with embeddings: {embedding_count}")
314
315
# Configure similarity function
316
document_store_cosine = InMemoryDocumentStore(
317
embedding_similarity_function="cosine"
318
)
319
320
document_store_dot = InMemoryDocumentStore(
321
embedding_similarity_function="dot_product"
322
)
323
```
324
325
### BM25 Configuration
326
327
```python
328
# Configure BM25 parameters
329
bm25_config = {
330
"k1": 1.5, # Term frequency saturation parameter
331
"b": 0.75 # Length normalization parameter
332
}
333
334
document_store_bm25 = InMemoryDocumentStore(
335
bm25_algorithm="BM25Okapi",
336
bm25_parameters=bm25_config,
337
bm25_tokenization_regex=r"\b\w+\b" # Custom tokenization
338
)
339
340
# Write documents for BM25 search
341
text_docs = [
342
Document(content="Natural language processing enables computers to understand text."),
343
Document(content="Machine learning algorithms learn patterns from data."),
344
Document(content="Deep learning uses neural networks with many layers.")
345
]
346
347
document_store_bm25.write_documents(text_docs)
348
349
# BM25 search will be available through BM25Retriever
350
from haystack.components.retrievers import InMemoryBM25Retriever
351
352
bm25_retriever = InMemoryBM25Retriever(document_store=document_store_bm25)
353
search_results = bm25_retriever.run(query="machine learning neural networks")
354
355
for doc in search_results["documents"]:
356
print(f"BM25 Score: {doc.score:.3f} - {doc.content}")
357
```
358
359
### Document Management Operations
360
361
```python
362
# Bulk document operations
363
bulk_docs = [
364
Document(content=f"Document {i}", meta={"batch": "bulk_1"})
365
for i in range(100)
366
]
367
368
# Write large batch
369
start_time = time.time()
370
written_count = document_store.write_documents(bulk_docs)
371
end_time = time.time()
372
print(f"Wrote {written_count} documents in {end_time - start_time:.2f} seconds")
373
374
# Delete by filter (conceptual - would need custom implementation)
375
batch_docs = document_store.filter_documents(filters={"batch": "bulk_1"})
376
doc_ids_to_delete = [doc.id for doc in batch_docs[:50]]
377
document_store.delete_documents(doc_ids_to_delete)
378
379
print(f"Remaining documents: {document_store.count_documents()}")
380
381
# Update document metadata (re-write with same ID)
382
doc_to_update = document_store.get_all_documents()[0]
383
doc_to_update.meta["updated"] = True
384
doc_to_update.meta["update_time"] = "2024-01-01"
385
386
document_store.write_documents([doc_to_update], policy=DuplicatePolicy.OVERWRITE)
387
```
388
389
### Custom Filter Policies
390
391
```python
392
from haystack.document_stores.types import FilterPolicy, apply_filter_policy
393
394
# Define custom filter policy
395
policy = FilterPolicy(
396
conditions=["$eq", "$ne", "$in", "$nin", "$gte", "$lte"],
397
on_invalid_filter="ignore" # Ignore invalid filters instead of raising error
398
)
399
400
# Apply policy to filters
401
raw_filters = {
402
"category": "ai",
403
"invalid_operator": {"$invalid": "value"},
404
"score": {"$gte": 80}
405
}
406
407
validated_filters = apply_filter_policy(raw_filters, policy)
408
print(f"Validated filters: {validated_filters}")
409
410
# Use validated filters
411
filtered_docs = document_store.filter_documents(filters=validated_filters)
412
```
413
414
### Multi-Store Pipeline Integration
415
416
```python
417
from haystack import Pipeline
418
from haystack.components.writers import DocumentWriter
419
from haystack.components.preprocessors import DocumentSplitter
420
421
# Create document processing pipeline with multiple stores
422
processing_pipeline = Pipeline()
423
424
# Add components
425
processing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence"))
426
processing_pipeline.add_component("writer", DocumentWriter(document_store=document_store))
427
428
# Connect components
429
processing_pipeline.connect("splitter.documents", "writer.documents")
430
431
# Process and store documents
432
large_documents = [
433
Document(content="This is a long document. It contains multiple sentences. Each sentence will be split.")
434
]
435
436
result = processing_pipeline.run({
437
"splitter": {"documents": large_documents}
438
})
439
440
print(f"Processed and stored {len(result['writer']['documents_written'])} document chunks")
441
442
# Verify storage
443
stored_chunks = document_store.get_all_documents()
444
for chunk in stored_chunks[-3:]: # Show last 3 chunks
445
print(f"Chunk: {chunk.content}")
446
```
447
448
## Types
449
450
```python { .api }
451
from typing import Protocol, List, Dict, Any, Optional, Literal
452
from enum import Enum
453
from haystack import Document
454
455
class DocumentStoreError(Exception):
456
"""Base exception for document store operations."""
457
pass
458
459
class DuplicateDocumentError(DocumentStoreError):
460
"""Raised when duplicate document handling fails."""
461
pass
462
463
class FilterCondition:
464
"""Represents a filter condition."""
465
field: str
466
operator: str
467
value: Any
468
469
class SearchResult:
470
"""Result of a document search operation."""
471
documents: List[Document]
472
total_count: int
473
query_time: float
474
```