CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-haystack-ai

LLM framework to build customizable, production-ready LLM applications.

Pending
Overview
Eval results
Files

document-stores.mddocs/

Document Stores

Storage backends for documents and embeddings with filtering, search capabilities, and data persistence. Haystack provides document store implementations that serve as the foundation for retrieval and search operations.

Capabilities

In-Memory Document Store

Fast, memory-based document storage for development and small-scale applications.

class InMemoryDocumentStore:
    def __init__(
        self,
        bm25_tokenization_regex: str = r"(?u)\b\w\w+\b",
        bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25Okapi",
        bm25_parameters: Optional[Dict[str, Any]] = None,
        embedding_similarity_function: Literal["cosine", "dot_product", "euclidean"] = "cosine"
    ) -> None:
        """
        Initialize in-memory document store.
        
        Args:
            bm25_tokenization_regex: Regex pattern for BM25 tokenization
            bm25_algorithm: BM25 algorithm variant to use
            bm25_parameters: Parameters for BM25 algorithm (k1, b, epsilon, delta)
            embedding_similarity_function: Similarity function for embedding search
        """

    def write_documents(
        self,
        documents: List[Document],
        policy: DuplicatePolicy = DuplicatePolicy.NONE
    ) -> int:
        """
        Write documents to the store.
        
        Args:
            documents: List of Document objects to store
            policy: How to handle duplicate documents
            
        Returns:
            Number of documents written
        """

    def filter_documents(
        self,
        filters: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """
        Filter documents based on metadata criteria.
        
        Args:
            filters: Dictionary of filter conditions
            
        Returns:
            List of documents matching the filters
        """

    def count_documents(self) -> int:
        """
        Count total number of documents in the store.
        
        Returns:
            Total document count
        """

    def delete_documents(
        self,
        document_ids: List[str]
    ) -> None:
        """
        Delete documents by their IDs.
        
        Args:
            document_ids: List of document IDs to delete
        """

    def get_documents_by_id(
        self,
        document_ids: List[str]
    ) -> List[Document]:
        """
        Retrieve documents by their IDs.
        
        Args:
            document_ids: List of document IDs to retrieve
            
        Returns:
            List of retrieved documents
        """

    def get_all_documents(self) -> List[Document]:
        """
        Retrieve all documents from the store.
        
        Returns:
            List of all documents
        """

    def get_embedding_count(self) -> int:
        """
        Count documents with embeddings.
        
        Returns:
            Number of documents containing embeddings
        """

Document Store Protocol

Interface definition for all document store implementations.

class DocumentStore(Protocol):
    """Protocol defining the interface for document stores."""
    
    def write_documents(
        self,
        documents: List[Document],
        policy: DuplicatePolicy = DuplicatePolicy.NONE
    ) -> int:
        """Write documents to the store."""
    
    def filter_documents(
        self,
        filters: Optional[Dict[str, Any]] = None
    ) -> List[Document]:
        """Filter documents based on metadata."""
    
    def count_documents(self) -> int:
        """Count total documents."""
    
    def delete_documents(self, document_ids: List[str]) -> None:
        """Delete documents by ID."""

Duplicate Handling Policies

Control how duplicate documents are handled during writing operations.

class DuplicatePolicy(Enum):
    """Policies for handling duplicate documents."""
    
    NONE = "none"           # Raise error on duplicates
    SKIP = "skip"           # Skip duplicate documents  
    OVERWRITE = "overwrite" # Replace existing documents
    FAIL = "fail"           # Fail the entire operation

Filter Policies

Define how document filtering should be applied across different metadata types.

class FilterPolicy:
    def __init__(
        self,
        conditions: List[str] = None,
        on_invalid_filter: Literal["raise", "ignore", "remove"] = "raise"
    ) -> None:
        """
        Initialize filter policy.
        
        Args:
            conditions: List of allowed filter conditions
            on_invalid_filter: Action to take on invalid filters
        """

def apply_filter_policy(
    filters: Dict[str, Any],
    policy: FilterPolicy = None
) -> Dict[str, Any]:
    """
    Apply filter policy to a set of filters.
    
    Args:
        filters: Filter dictionary to validate
        policy: Filter policy to apply
        
    Returns:
        Validated and processed filters
    """

Usage Examples

Basic Document Store Operations

from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document

# Initialize document store
document_store = InMemoryDocumentStore()

# Create sample documents
documents = [
    Document(
        content="Python is a high-level programming language.",
        meta={"category": "programming", "language": "en", "difficulty": "beginner"}
    ),
    Document(
        content="Machine learning is a subset of artificial intelligence.",
        meta={"category": "ai", "language": "en", "difficulty": "intermediate"}
    ),
    Document(
        content="Neural networks are inspired by biological neurons.",
        meta={"category": "ai", "language": "en", "difficulty": "advanced"}
    )
]

# Write documents to store
written_count = document_store.write_documents(documents)
print(f"Written {written_count} documents")

# Count total documents
total_docs = document_store.count_documents()
print(f"Total documents: {total_docs}")

# Get all documents
all_docs = document_store.get_all_documents()
for doc in all_docs:
    print(f"ID: {doc.id} - Content: {doc.content[:50]}...")

Document Filtering

# Filter by single criteria
programming_docs = document_store.filter_documents(
    filters={"category": "programming"}
)
print(f"Programming documents: {len(programming_docs)}")

# Filter by multiple criteria
ai_beginner_docs = document_store.filter_documents(
    filters={"category": "ai", "difficulty": "beginner"}
)

# Advanced filtering with operators
advanced_filters = {
    "difficulty": {"$in": ["intermediate", "advanced"]},
    "category": {"$ne": "programming"}
}
filtered_docs = document_store.filter_documents(filters=advanced_filters)

# Range filtering for numeric metadata
numeric_docs = [
    Document(content="Document 1", meta={"score": 85, "year": 2023}),
    Document(content="Document 2", meta={"score": 92, "year": 2022}),
    Document(content="Document 3", meta={"score": 78, "year": 2024})
]

document_store.write_documents(numeric_docs)

# Filter by score range
high_score_docs = document_store.filter_documents(
    filters={"score": {"$gte": 80}}
)

# Filter by year range
recent_docs = document_store.filter_documents(
    filters={"year": {"$gte": 2023, "$lte": 2024}}
)

Duplicate Handling

from haystack.document_stores.types import DuplicatePolicy

# Create documents with same ID
doc1 = Document(content="Original content", id="doc_123")
doc2 = Document(content="Updated content", id="doc_123")

# Skip duplicates
document_store.write_documents([doc1], policy=DuplicatePolicy.NONE)
written_count = document_store.write_documents([doc2], policy=DuplicatePolicy.SKIP)
print(f"Skipped duplicates, written: {written_count}")  # Should be 0

# Overwrite duplicates
written_count = document_store.write_documents([doc2], policy=DuplicatePolicy.OVERWRITE)
print(f"Overwritten duplicates, written: {written_count}")  # Should be 1

# Check the updated content
retrieved_doc = document_store.get_documents_by_id(["doc_123"])[0]
print(f"Updated content: {retrieved_doc.content}")  # "Updated content"

Working with Embeddings

from haystack.components.embedders import OpenAIDocumentEmbedder

# Create documents with embeddings
embedder = OpenAIDocumentEmbedder()
docs_to_embed = [
    Document(content="Vector databases store high-dimensional data."),
    Document(content="Similarity search finds related documents."),
    Document(content="Embeddings capture semantic meaning.")
]

# Generate embeddings
embedding_result = embedder.run(documents=docs_to_embed)
embedded_docs = embedding_result["documents"]

# Store documents with embeddings
document_store.write_documents(embedded_docs)

# Check embedding count
embedding_count = document_store.get_embedding_count()
print(f"Documents with embeddings: {embedding_count}")

# Configure similarity function
document_store_cosine = InMemoryDocumentStore(
    embedding_similarity_function="cosine"
)

document_store_dot = InMemoryDocumentStore(
    embedding_similarity_function="dot_product"
)

BM25 Configuration

# Configure BM25 parameters
bm25_config = {
    "k1": 1.5,  # Term frequency saturation parameter
    "b": 0.75   # Length normalization parameter
}

document_store_bm25 = InMemoryDocumentStore(
    bm25_algorithm="BM25Okapi",
    bm25_parameters=bm25_config,
    bm25_tokenization_regex=r"\b\w+\b"  # Custom tokenization
)

# Write documents for BM25 search
text_docs = [
    Document(content="Natural language processing enables computers to understand text."),
    Document(content="Machine learning algorithms learn patterns from data."),
    Document(content="Deep learning uses neural networks with many layers.")
]

document_store_bm25.write_documents(text_docs)

# BM25 search will be available through BM25Retriever
from haystack.components.retrievers import InMemoryBM25Retriever

bm25_retriever = InMemoryBM25Retriever(document_store=document_store_bm25)
search_results = bm25_retriever.run(query="machine learning neural networks")

for doc in search_results["documents"]:
    print(f"BM25 Score: {doc.score:.3f} - {doc.content}")

Document Management Operations

# Bulk document operations
bulk_docs = [
    Document(content=f"Document {i}", meta={"batch": "bulk_1"})
    for i in range(100)
]

# Write large batch
start_time = time.time()
written_count = document_store.write_documents(bulk_docs)
end_time = time.time()
print(f"Wrote {written_count} documents in {end_time - start_time:.2f} seconds")

# Delete by filter (conceptual - would need custom implementation)
batch_docs = document_store.filter_documents(filters={"batch": "bulk_1"})
doc_ids_to_delete = [doc.id for doc in batch_docs[:50]]
document_store.delete_documents(doc_ids_to_delete)

print(f"Remaining documents: {document_store.count_documents()}")

# Update document metadata (re-write with same ID)
doc_to_update = document_store.get_all_documents()[0]
doc_to_update.meta["updated"] = True
doc_to_update.meta["update_time"] = "2024-01-01"

document_store.write_documents([doc_to_update], policy=DuplicatePolicy.OVERWRITE)

Custom Filter Policies

from haystack.document_stores.types import FilterPolicy, apply_filter_policy

# Define custom filter policy
policy = FilterPolicy(
    conditions=["$eq", "$ne", "$in", "$nin", "$gte", "$lte"],
    on_invalid_filter="ignore"  # Ignore invalid filters instead of raising error
)

# Apply policy to filters
raw_filters = {
    "category": "ai",
    "invalid_operator": {"$invalid": "value"},
    "score": {"$gte": 80}
}

validated_filters = apply_filter_policy(raw_filters, policy)
print(f"Validated filters: {validated_filters}")

# Use validated filters
filtered_docs = document_store.filter_documents(filters=validated_filters)

Multi-Store Pipeline Integration

from haystack import Pipeline
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentSplitter

# Create document processing pipeline with multiple stores
processing_pipeline = Pipeline()

# Add components
processing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence"))
processing_pipeline.add_component("writer", DocumentWriter(document_store=document_store))

# Connect components
processing_pipeline.connect("splitter.documents", "writer.documents")

# Process and store documents
large_documents = [
    Document(content="This is a long document. It contains multiple sentences. Each sentence will be split.")
]

result = processing_pipeline.run({
    "splitter": {"documents": large_documents}
})

print(f"Processed and stored {len(result['writer']['documents_written'])} document chunks")

# Verify storage
stored_chunks = document_store.get_all_documents()
for chunk in stored_chunks[-3:]:  # Show last 3 chunks
    print(f"Chunk: {chunk.content}")

Types

from typing import Protocol, List, Dict, Any, Optional, Literal
from enum import Enum
from haystack import Document

class DocumentStoreError(Exception):
    """Base exception for document store operations."""
    pass

class DuplicateDocumentError(DocumentStoreError):
    """Raised when duplicate document handling fails."""
    pass

class FilterCondition:
    """Represents a filter condition."""
    field: str
    operator: str
    value: Any

class SearchResult:
    """Result of a document search operation."""
    documents: List[Document]
    total_count: int
    query_time: float

Install with Tessl CLI

npx tessl i tessl/pypi-haystack-ai

docs

agent-framework.md

core-framework.md

document-processing.md

document-stores.md

evaluation.md

index.md

prompt-building.md

retrieval.md

text-embeddings.md

text-generation.md

tile.json