LLM framework to build customizable, production-ready LLM applications with pipelines connecting models, vector DBs, and data processors.
—
Retrievers are the search components in Haystack that find relevant documents from document stores based on queries. They implement various retrieval strategies including sparse keyword-based methods, dense vector similarity, and specialized multi-modal approaches.
from haystack.nodes.retriever import (
BaseRetriever,
# Sparse retrievers
BM25Retriever, TfidfRetriever, FilterRetriever,
# Dense retrievers
DenseRetriever, DensePassageRetriever, EmbeddingRetriever,
MultihopEmbeddingRetriever, TableTextRetriever,
# Specialized retrievers
MultiModalRetriever, WebRetriever, LinkContentFetcher
)Abstract base class defining the retriever interface.
from haystack.nodes.retriever.base import BaseRetriever
from haystack.document_stores.base import BaseDocumentStore, FilterType
from haystack.schema import Document
from typing import List, Optional, Dict, Union
class BaseRetriever(BaseComponent):
"""Abstract base class for all retrievers."""
def retrieve(
self,
query: str,
filters: Optional[FilterType] = None,
top_k: Optional[int] = None,
index: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
scale_score: Optional[bool] = None,
document_store: Optional[BaseDocumentStore] = None,
) -> List[Document]:
"""
Retrieve documents most relevant to the query.
Args:
query: Search query string
filters: Metadata filters to narrow search scope
top_k: Number of documents to retrieve
index: Document store index name
headers: Custom HTTP headers for document store
scale_score: Whether to normalize scores to [0,1] range
document_store: Override default document store
Returns:
List of retrieved Document objects with relevance scores
"""
def retrieve_batch(
self,
queries: List[str],
filters: Optional[Union[FilterType, List[Optional[FilterType]]]] = None,
top_k: Optional[int] = None,
index: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
batch_size: Optional[int] = None,
scale_score: Optional[bool] = None,
document_store: Optional[BaseDocumentStore] = None,
) -> List[List[Document]]:
"""Batch retrieval for multiple queries."""Sparse retrievers use keyword-based methods like BM25 and TF-IDF for document retrieval.
Best Matching 25 algorithm for keyword-based retrieval.
from haystack.nodes.retriever import BM25Retriever
from haystack.document_stores import KeywordDocumentStore
class BM25Retriever(BaseRetriever):
def __init__(
self,
document_store: Optional[KeywordDocumentStore] = None,
top_k: int = 10,
all_terms_must_match: bool = False,
custom_query: Optional[str] = None,
scale_score: bool = True,
):
"""
Initialize BM25 retriever.
Args:
document_store: Keyword-searchable document store
top_k: Number of documents to retrieve
all_terms_must_match: Whether all query terms must match (AND vs OR)
custom_query: Custom Elasticsearch query template
scale_score: Whether to normalize scores to [0,1]
"""from haystack.nodes.retriever import BM25Retriever
from haystack.document_stores import ElasticsearchDocumentStore
# Basic setup
document_store = ElasticsearchDocumentStore()
bm25_retriever = BM25Retriever(
document_store=document_store,
top_k=10,
all_terms_must_match=False
)
# Simple retrieval
documents = bm25_retriever.retrieve(
query="Python machine learning framework",
filters={"category": "documentation"}
)
# Custom Elasticsearch query
custom_bm25 = BM25Retriever(
document_store=document_store,
custom_query={
"size": 10,
"query": {
"bool": {
"should": [{"multi_match": {
"query": "${query}",
"type": "most_fields",
"fields": ["content", "title"]
}}],
"filter": "${filters}"
}
},
"highlight": {
"fields": {"content": {}, "title": {}}
}
}
)
# Access highlighted results
highlighted_docs = custom_bm25.retrieve(query="Haystack framework")
highlighted_content = highlighted_docs[0].meta["highlighted"]["content"]Term Frequency-Inverse Document Frequency retriever.
from haystack.nodes.retriever import TfidfRetriever
class TfidfRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
top_k: int = 10,
auto_fit: bool = True,
):
"""
Initialize TF-IDF retriever.
Args:
document_store: Document store to retrieve from
top_k: Number of documents to retrieve
auto_fit: Whether to automatically fit the TF-IDF model
"""
# Usage
tfidf_retriever = TfidfRetriever(
document_store=document_store,
top_k=10
)
# Fit the model (if not auto_fit)
tfidf_retriever.fit()
# Retrieve documents
results = tfidf_retriever.retrieve(query="information retrieval")Metadata-based document filtering without similarity scoring.
from haystack.nodes.retriever import FilterRetriever
class FilterRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
top_k: int = 10,
):
"""
Initialize filter-based retriever.
Args:
document_store: Document store to filter
top_k: Maximum number of documents to return
"""
# Usage - returns documents matching filters only
filter_retriever = FilterRetriever(document_store=document_store)
# Retrieve by metadata only (no query text needed)
filtered_docs = filter_retriever.retrieve(
query="", # Empty query
filters={
"source": "documentation",
"date": {"$gte": "2023-01-01"},
"status": "published"
}
)Dense retrievers use embedding vectors for semantic similarity search.
General-purpose embedding-based retriever.
from haystack.nodes.retriever import EmbeddingRetriever
from haystack.document_stores import FAISSDocumentStore
class EmbeddingRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
model_version: Optional[str] = None,
use_gpu: bool = True,
batch_size: int = 32,
max_seq_len: int = 512,
model_format: str = "sentence_transformers",
pooling_strategy: str = "reduce_mean",
emb_extraction_layer: int = -1,
top_k: int = 10,
similarity_function: str = "dot_product",
progress_bar: bool = True,
devices: Optional[List[Union[str, torch.device]]] = None,
use_auth_token: Optional[Union[str, bool]] = None,
scale_score: bool = True,
embed_title: bool = True,
api_key: Optional[str] = None,
azure_api_version: str = "2022-12-01",
azure_base_url: Optional[str] = None,
):
"""
Initialize embedding retriever.
Args:
document_store: Vector-enabled document store
embedding_model: Model name or path for embeddings
model_format: Format type ("sentence_transformers", "transformers", "openai")
use_gpu: Whether to use GPU for embedding generation
batch_size: Batch size for embedding generation
max_seq_len: Maximum sequence length for model
similarity_function: Similarity metric ("dot_product", "cosine")
embed_title: Whether to include document title in embeddings
"""Facebook's Dense Passage Retrieval implementation.
from haystack.nodes.retriever import DensePassageRetriever
class DensePassageRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
query_embedding_model: Union[str, Path] = "facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model: Union[str, Path] = "facebook/dpr-ctx_encoder-single-nq-base",
model_version: Optional[str] = None,
max_seq_len_query: int = 64,
max_seq_len_passage: int = 256,
top_k: int = 10,
use_gpu: bool = True,
batch_size: int = 16,
embed_title: bool = True,
use_fast_tokenizers: bool = True,
infer_tokenizer_classes: bool = False,
similarity_function: str = "dot_product",
progress_bar: bool = True,
devices: Optional[List[Union[str, torch.device]]] = None,
use_auth_token: Optional[Union[str, bool]] = None,
scale_score: bool = True,
):
"""
Initialize DPR retriever.
Args:
document_store: Vector-enabled document store
query_embedding_model: Model for encoding queries
passage_embedding_model: Model for encoding passages
max_seq_len_query: Max sequence length for queries
max_seq_len_passage: Max sequence length for passages
embed_title: Whether to embed document titles
similarity_function: Similarity metric for ranking
"""from haystack.nodes.retriever import EmbeddingRetriever, DensePassageRetriever
from haystack.document_stores import FAISSDocumentStore
# Embedding retriever setup
document_store = FAISSDocumentStore(
vector_dim=384,
faiss_index_factory_str="Flat"
)
embedding_retriever = EmbeddingRetriever(
document_store=document_store,
embedding_model="sentence-transformers/all-MiniLM-L6-v2",
model_format="sentence_transformers",
top_k=10
)
# Generate embeddings for documents
document_store.update_embeddings(embedding_retriever)
# Semantic search
semantic_results = embedding_retriever.retrieve(
query="How to build chatbots with AI?",
top_k=5
)
# DPR retriever for question-answering
dpr_retriever = DensePassageRetriever(
document_store=document_store,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
max_seq_len_query=64,
max_seq_len_passage=256
)
# Generate DPR embeddings
document_store.update_embeddings(dpr_retriever)
# QA-optimized retrieval
qa_results = dpr_retriever.retrieve(
query="What is the capital of France?",
top_k=3
)Multi-step reasoning retriever for complex queries.
from haystack.nodes.retriever import MultihopEmbeddingRetriever
class MultihopEmbeddingRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
num_iterations: int = 2,
top_k: int = 10,
use_gpu: bool = True,
batch_size: int = 32,
):
"""
Initialize multi-hop retriever for iterative document retrieval.
Args:
document_store: Vector-enabled document store
embedding_model: Model for generating embeddings
num_iterations: Number of retrieval iterations
top_k: Documents per iteration
"""
# Usage for complex multi-step queries
multihop_retriever = MultihopEmbeddingRetriever(
document_store=document_store,
num_iterations=3,
top_k=5
)
# Complex reasoning query
complex_results = multihop_retriever.retrieve(
query="What company founded by Steve Jobs created the iPhone and when was it released?"
)Joint retrieval from both text and tabular data.
from haystack.nodes.retriever import TableTextRetriever
class TableTextRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
query_embedding_model: str = "deepset/all-mpnet-base-v2-table",
passage_embedding_model: str = "deepset/all-mpnet-base-v2-table",
table_embedding_model: str = "deepset/all-mpnet-base-v2-table",
model_version: Optional[str] = None,
max_seq_len: int = 256,
use_gpu: bool = True,
batch_size: int = 16,
similarity_function: str = "dot_product",
top_k: int = 10,
):
"""
Initialize table-text joint retriever.
Args:
document_store: Document store with text and table documents
query_embedding_model: Model for query embeddings
passage_embedding_model: Model for text passage embeddings
table_embedding_model: Model for table embeddings
similarity_function: Similarity computation method
"""
# Usage with mixed text and table documents
table_text_retriever = TableTextRetriever(
document_store=document_store,
query_embedding_model="deepset/all-mpnet-base-v2-table"
)
# Retrieve from both text and tables
mixed_results = table_text_retriever.retrieve(
query="What was the revenue in Q4 2022?",
top_k=10
)Retrieval across multiple content modalities (text, images, etc.).
from haystack.nodes.retriever import MultiModalRetriever
class MultiModalRetriever(BaseRetriever):
def __init__(
self,
document_store: Optional[BaseDocumentStore] = None,
query_embedding_model: str = "sentence-transformers/clip-ViT-B-32",
document_embedding_models: Optional[Dict[str, str]] = None,
top_k: int = 10,
progress_bar: bool = True,
):
"""
Initialize multi-modal retriever.
Args:
document_store: Document store with multi-modal documents
query_embedding_model: Model for query embeddings (e.g., CLIP)
document_embedding_models: Models per content type
top_k: Number of documents to retrieve
"""
# Usage with images and text
multimodal_retriever = MultiModalRetriever(
document_store=document_store,
query_embedding_model="sentence-transformers/clip-ViT-B-32",
document_embedding_models={
"text": "sentence-transformers/all-MiniLM-L6-v2",
"image": "sentence-transformers/clip-ViT-B-32"
}
)
# Search across text and images
multimodal_results = multimodal_retriever.retrieve(
query="sunset over mountains",
top_k=5
)Web search integration for external content retrieval.
from haystack.nodes.retriever import WebRetriever
class WebRetriever(BaseRetriever):
def __init__(
self,
api_key: str,
search_engine_provider: str = "SerperDev",
top_k: int = 10,
mode: str = "preprocessed_documents",
preprocessor: Optional[BasePreProcessor] = None,
cache_document: Optional[bool] = None,
cache_index: Optional[str] = None,
cache_headers: Optional[Dict[str, str]] = None,
document_store: Optional[BaseDocumentStore] = None,
):
"""
Initialize web search retriever.
Args:
api_key: API key for search engine provider
search_engine_provider: Provider ("SerperDev", "SerpAPI")
mode: Return mode ("preprocessed_documents", "raw_documents", "snippets")
preprocessor: Text preprocessor for web content
cache_document: Whether to cache retrieved documents
document_store: Optional document store for caching
"""
# Usage for web search
web_retriever = WebRetriever(
api_key="your-serper-api-key",
search_engine_provider="SerperDev",
top_k=10,
mode="preprocessed_documents"
)
# Search the web
web_results = web_retriever.retrieve(
query="latest developments in large language models 2024"
)Fetch and process content from web links.
from haystack.nodes.retriever import LinkContentFetcher
class LinkContentFetcher(BaseRetriever):
def __init__(
self,
raise_on_failure: bool = False,
suppress_extraction_errors: bool = True,
):
"""
Initialize link content fetcher.
Args:
raise_on_failure: Whether to raise exceptions on fetch failures
suppress_extraction_errors: Whether to suppress content extraction errors
"""
# Usage for processing web links
link_fetcher = LinkContentFetcher()
# Process documents containing URLs
documents_with_links = [
Document(content="", meta={"url": "https://example.com/article1"}),
Document(content="", meta={"url": "https://example.com/article2"})
]
# Fetch content from URLs
fetched_content = link_fetcher.run(documents=documents_with_links)All retrievers support efficient batch processing:
# Batch queries for efficiency
queries = [
"What is machine learning?",
"How does deep learning work?",
"What are neural networks?"
]
# Batch retrieval
batch_results = embedding_retriever.retrieve_batch(
queries=queries,
top_k=5,
batch_size=10
)
# Process results
for i, query in enumerate(queries):
docs = batch_results[i]
print(f"Query: {query}")
print(f"Found {len(docs)} documents")# GPU acceleration for embeddings
gpu_retriever = EmbeddingRetriever(
document_store=document_store,
embedding_model="sentence-transformers/all-MiniLM-L6-v2",
use_gpu=True,
batch_size=64, # Larger batch for GPU efficiency
devices=["cuda:0"]
)
# Optimize for production
production_retriever = EmbeddingRetriever(
document_store=document_store,
embedding_model="sentence-transformers/all-MiniLM-L6-v2",
use_gpu=True,
batch_size=128,
progress_bar=False, # Disable for production
scale_score=True
)# Complex filter examples
advanced_filters = {
"$and": [
{"source": {"$in": ["docs", "tutorials"]}},
{"date": {"$gte": "2023-01-01"}},
{"$or": [
{"category": "beginner"},
{"rating": {"$gte": 4.5}}
]},
{"tags": {"$in": ["python", "ai"]}}
]
}
# Apply complex filters
filtered_results = embedding_retriever.retrieve(
query="getting started with AI",
filters=advanced_filters,
top_k=10
)
# Date range filtering
date_filters = {
"published_date": {
"$gte": "2023-01-01",
"$lt": "2024-01-01"
}
}
# Numeric range filtering
score_filters = {
"confidence": {"$gte": 0.8},
"word_count": {"$gte": 100, "$lte": 5000}
}# Evaluate retriever performance
eval_result = bm25_retriever.eval(
label_index="evaluation_labels",
doc_index="evaluation_docs",
top_k=10,
open_domain=True,
return_preds=True
)
# Access metrics
print(f"Recall@10: {eval_result['recall']}")
print(f"MAP: {eval_result['map']}")
print(f"MRR: {eval_result['mrr']}")
# Performance timing
print(f"Average query time: {bm25_retriever.query_time / bm25_retriever.query_count:.3f}s")from haystack import Pipeline
# Create retrieval pipeline
pipeline = Pipeline()
pipeline.add_node(
component=embedding_retriever,
name="Retriever",
inputs=["Query"]
)
# Add additional processing
pipeline.add_node(
component=reader,
name="Reader",
inputs=["Retriever"]
)
# Execute pipeline
result = pipeline.run(
query="How to use Haystack retrievers?",
params={
"Retriever": {
"top_k": 5,
"filters": {"source": "documentation"}
},
"Reader": {"top_k": 3}
}
)Retrievers form the core search capability in Haystack, enabling everything from simple keyword matching to sophisticated semantic search and multi-modal retrieval across diverse content types and storage backends.
Install with Tessl CLI
npx tessl i tessl/pypi-farm-haystack