LLM framework to build customizable, production-ready LLM applications.
—
Storage backends for documents and embeddings with filtering, search capabilities, and data persistence. Haystack provides document store implementations that serve as the foundation for retrieval and search operations.
Fast, memory-based document storage for development and small-scale applications.
class InMemoryDocumentStore:
def __init__(
self,
bm25_tokenization_regex: str = r"(?u)\b\w\w+\b",
bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25Okapi",
bm25_parameters: Optional[Dict[str, Any]] = None,
embedding_similarity_function: Literal["cosine", "dot_product", "euclidean"] = "cosine"
) -> None:
"""
Initialize in-memory document store.
Args:
bm25_tokenization_regex: Regex pattern for BM25 tokenization
bm25_algorithm: BM25 algorithm variant to use
bm25_parameters: Parameters for BM25 algorithm (k1, b, epsilon, delta)
embedding_similarity_function: Similarity function for embedding search
"""
def write_documents(
self,
documents: List[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE
) -> int:
"""
Write documents to the store.
Args:
documents: List of Document objects to store
policy: How to handle duplicate documents
Returns:
Number of documents written
"""
def filter_documents(
self,
filters: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""
Filter documents based on metadata criteria.
Args:
filters: Dictionary of filter conditions
Returns:
List of documents matching the filters
"""
def count_documents(self) -> int:
"""
Count total number of documents in the store.
Returns:
Total document count
"""
def delete_documents(
self,
document_ids: List[str]
) -> None:
"""
Delete documents by their IDs.
Args:
document_ids: List of document IDs to delete
"""
def get_documents_by_id(
self,
document_ids: List[str]
) -> List[Document]:
"""
Retrieve documents by their IDs.
Args:
document_ids: List of document IDs to retrieve
Returns:
List of retrieved documents
"""
def get_all_documents(self) -> List[Document]:
"""
Retrieve all documents from the store.
Returns:
List of all documents
"""
def get_embedding_count(self) -> int:
"""
Count documents with embeddings.
Returns:
Number of documents containing embeddings
"""Interface definition for all document store implementations.
class DocumentStore(Protocol):
"""Protocol defining the interface for document stores."""
def write_documents(
self,
documents: List[Document],
policy: DuplicatePolicy = DuplicatePolicy.NONE
) -> int:
"""Write documents to the store."""
def filter_documents(
self,
filters: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Filter documents based on metadata."""
def count_documents(self) -> int:
"""Count total documents."""
def delete_documents(self, document_ids: List[str]) -> None:
"""Delete documents by ID."""Control how duplicate documents are handled during writing operations.
class DuplicatePolicy(Enum):
"""Policies for handling duplicate documents."""
NONE = "none" # Raise error on duplicates
SKIP = "skip" # Skip duplicate documents
OVERWRITE = "overwrite" # Replace existing documents
FAIL = "fail" # Fail the entire operationDefine how document filtering should be applied across different metadata types.
class FilterPolicy:
def __init__(
self,
conditions: List[str] = None,
on_invalid_filter: Literal["raise", "ignore", "remove"] = "raise"
) -> None:
"""
Initialize filter policy.
Args:
conditions: List of allowed filter conditions
on_invalid_filter: Action to take on invalid filters
"""
def apply_filter_policy(
filters: Dict[str, Any],
policy: FilterPolicy = None
) -> Dict[str, Any]:
"""
Apply filter policy to a set of filters.
Args:
filters: Filter dictionary to validate
policy: Filter policy to apply
Returns:
Validated and processed filters
"""from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
# Initialize document store
document_store = InMemoryDocumentStore()
# Create sample documents
documents = [
Document(
content="Python is a high-level programming language.",
meta={"category": "programming", "language": "en", "difficulty": "beginner"}
),
Document(
content="Machine learning is a subset of artificial intelligence.",
meta={"category": "ai", "language": "en", "difficulty": "intermediate"}
),
Document(
content="Neural networks are inspired by biological neurons.",
meta={"category": "ai", "language": "en", "difficulty": "advanced"}
)
]
# Write documents to store
written_count = document_store.write_documents(documents)
print(f"Written {written_count} documents")
# Count total documents
total_docs = document_store.count_documents()
print(f"Total documents: {total_docs}")
# Get all documents
all_docs = document_store.get_all_documents()
for doc in all_docs:
print(f"ID: {doc.id} - Content: {doc.content[:50]}...")# Filter by single criteria
programming_docs = document_store.filter_documents(
filters={"category": "programming"}
)
print(f"Programming documents: {len(programming_docs)}")
# Filter by multiple criteria
ai_beginner_docs = document_store.filter_documents(
filters={"category": "ai", "difficulty": "beginner"}
)
# Advanced filtering with operators
advanced_filters = {
"difficulty": {"$in": ["intermediate", "advanced"]},
"category": {"$ne": "programming"}
}
filtered_docs = document_store.filter_documents(filters=advanced_filters)
# Range filtering for numeric metadata
numeric_docs = [
Document(content="Document 1", meta={"score": 85, "year": 2023}),
Document(content="Document 2", meta={"score": 92, "year": 2022}),
Document(content="Document 3", meta={"score": 78, "year": 2024})
]
document_store.write_documents(numeric_docs)
# Filter by score range
high_score_docs = document_store.filter_documents(
filters={"score": {"$gte": 80}}
)
# Filter by year range
recent_docs = document_store.filter_documents(
filters={"year": {"$gte": 2023, "$lte": 2024}}
)from haystack.document_stores.types import DuplicatePolicy
# Create documents with same ID
doc1 = Document(content="Original content", id="doc_123")
doc2 = Document(content="Updated content", id="doc_123")
# Skip duplicates
document_store.write_documents([doc1], policy=DuplicatePolicy.NONE)
written_count = document_store.write_documents([doc2], policy=DuplicatePolicy.SKIP)
print(f"Skipped duplicates, written: {written_count}") # Should be 0
# Overwrite duplicates
written_count = document_store.write_documents([doc2], policy=DuplicatePolicy.OVERWRITE)
print(f"Overwritten duplicates, written: {written_count}") # Should be 1
# Check the updated content
retrieved_doc = document_store.get_documents_by_id(["doc_123"])[0]
print(f"Updated content: {retrieved_doc.content}") # "Updated content"from haystack.components.embedders import OpenAIDocumentEmbedder
# Create documents with embeddings
embedder = OpenAIDocumentEmbedder()
docs_to_embed = [
Document(content="Vector databases store high-dimensional data."),
Document(content="Similarity search finds related documents."),
Document(content="Embeddings capture semantic meaning.")
]
# Generate embeddings
embedding_result = embedder.run(documents=docs_to_embed)
embedded_docs = embedding_result["documents"]
# Store documents with embeddings
document_store.write_documents(embedded_docs)
# Check embedding count
embedding_count = document_store.get_embedding_count()
print(f"Documents with embeddings: {embedding_count}")
# Configure similarity function
document_store_cosine = InMemoryDocumentStore(
embedding_similarity_function="cosine"
)
document_store_dot = InMemoryDocumentStore(
embedding_similarity_function="dot_product"
)# Configure BM25 parameters
bm25_config = {
"k1": 1.5, # Term frequency saturation parameter
"b": 0.75 # Length normalization parameter
}
document_store_bm25 = InMemoryDocumentStore(
bm25_algorithm="BM25Okapi",
bm25_parameters=bm25_config,
bm25_tokenization_regex=r"\b\w+\b" # Custom tokenization
)
# Write documents for BM25 search
text_docs = [
Document(content="Natural language processing enables computers to understand text."),
Document(content="Machine learning algorithms learn patterns from data."),
Document(content="Deep learning uses neural networks with many layers.")
]
document_store_bm25.write_documents(text_docs)
# BM25 search will be available through BM25Retriever
from haystack.components.retrievers import InMemoryBM25Retriever
bm25_retriever = InMemoryBM25Retriever(document_store=document_store_bm25)
search_results = bm25_retriever.run(query="machine learning neural networks")
for doc in search_results["documents"]:
print(f"BM25 Score: {doc.score:.3f} - {doc.content}")# Bulk document operations
bulk_docs = [
Document(content=f"Document {i}", meta={"batch": "bulk_1"})
for i in range(100)
]
# Write large batch
start_time = time.time()
written_count = document_store.write_documents(bulk_docs)
end_time = time.time()
print(f"Wrote {written_count} documents in {end_time - start_time:.2f} seconds")
# Delete by filter (conceptual - would need custom implementation)
batch_docs = document_store.filter_documents(filters={"batch": "bulk_1"})
doc_ids_to_delete = [doc.id for doc in batch_docs[:50]]
document_store.delete_documents(doc_ids_to_delete)
print(f"Remaining documents: {document_store.count_documents()}")
# Update document metadata (re-write with same ID)
doc_to_update = document_store.get_all_documents()[0]
doc_to_update.meta["updated"] = True
doc_to_update.meta["update_time"] = "2024-01-01"
document_store.write_documents([doc_to_update], policy=DuplicatePolicy.OVERWRITE)from haystack.document_stores.types import FilterPolicy, apply_filter_policy
# Define custom filter policy
policy = FilterPolicy(
conditions=["$eq", "$ne", "$in", "$nin", "$gte", "$lte"],
on_invalid_filter="ignore" # Ignore invalid filters instead of raising error
)
# Apply policy to filters
raw_filters = {
"category": "ai",
"invalid_operator": {"$invalid": "value"},
"score": {"$gte": 80}
}
validated_filters = apply_filter_policy(raw_filters, policy)
print(f"Validated filters: {validated_filters}")
# Use validated filters
filtered_docs = document_store.filter_documents(filters=validated_filters)from haystack import Pipeline
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter
# Create document processing pipeline with multiple stores
processing_pipeline = Pipeline()
# Add components
processing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence"))
processing_pipeline.add_component("writer", DocumentWriter(document_store=document_store))
# Connect components
processing_pipeline.connect("splitter.documents", "writer.documents")
# Process and store documents
large_documents = [
Document(content="This is a long document. It contains multiple sentences. Each sentence will be split.")
]
result = processing_pipeline.run({
"splitter": {"documents": large_documents}
})
print(f"Processed and stored {len(result['writer']['documents_written'])} document chunks")
# Verify storage
stored_chunks = document_store.get_all_documents()
for chunk in stored_chunks[-3:]: # Show last 3 chunks
print(f"Chunk: {chunk.content}")from typing import Protocol, List, Dict, Any, Optional, Literal
from enum import Enum
from haystack import Document
class DocumentStoreError(Exception):
"""Base exception for document store operations."""
pass
class DuplicateDocumentError(DocumentStoreError):
"""Raised when duplicate document handling fails."""
pass
class FilterCondition:
"""Represents a filter condition."""
field: str
operator: str
value: Any
class SearchResult:
"""Result of a document search operation."""
documents: List[Document]
total_count: int
query_time: floatInstall with Tessl CLI
npx tessl i tessl/pypi-haystack-ai