LLM framework to build customizable, production-ready LLM applications.
—
Essential framework components for building pipelines, managing data flow, creating custom components, and handling serialization. The core framework provides the foundation for all Haystack functionality.
Create and orchestrate data flows between components using directed acyclic graphs (DAGs).
class Pipeline:
def __init__(
self,
metadata: Optional[Dict[str, Any]] = None,
max_runs_per_component: int = 100,
connection_type_validation: bool = True
) -> None:
"""
Initialize a new Pipeline.
Args:
metadata: Arbitrary dictionary to store metadata about this Pipeline
max_runs_per_component: Maximum number of times a component can run in a single pipeline execution
connection_type_validation: Whether the pipeline will validate the types of the connections
"""
def add_component(self, name: str, instance: Any) -> None:
"""
Add a component to the pipeline.
Args:
name: Unique name for the component
instance: Component instance to add
"""
def connect(self, sender: str, receiver: str) -> None:
"""
Connect components in the pipeline.
Args:
sender: Output socket in format "component_name.output_name"
receiver: Input socket in format "component_name.input_name"
"""
def run(
self,
data: Dict[str, Any],
include_outputs_from: Optional[Set[str]] = None,
*,
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
pipeline_snapshot: Optional[PipelineSnapshot] = None
) -> Dict[str, Any]:
"""
Run the pipeline with given input data.
Args:
data: Input data for pipeline components
include_outputs_from: Set of component names to include outputs from
break_point: Breakpoint configuration for debugging
pipeline_snapshot: Pipeline snapshot for resuming execution
Returns:
Dictionary containing outputs from all components
"""
def draw(
self,
*,
path: Path,
server_url: str = "https://mermaid.ink",
params: Optional[Dict] = None
) -> None:
"""
Draw a visual representation of the pipeline.
Args:
path: File path to save the visualization (required)
server_url: Mermaid server URL for rendering
params: Additional parameters for the rendering
"""
class AsyncPipeline:
def __init__(
self,
metadata: Optional[Dict[str, Any]] = None,
max_runs_per_component: int = 100,
connection_type_validation: bool = True
) -> None:
"""Initialize an asynchronous Pipeline."""
def add_component(self, name: str, instance: Any) -> None:
"""Add a component to the async pipeline."""
def connect(self, sender: str, receiver: str) -> None:
"""Connect components in the async pipeline."""
async def run(
self,
data: Dict[str, Any],
include_outputs_from: Optional[Set[str]] = None,
*,
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
pipeline_snapshot: Optional[PipelineSnapshot] = None
) -> Dict[str, Any]:
"""
Run the async pipeline with given input data.
Args:
data: Input data for pipeline components
include_outputs_from: Set of component names to include outputs from
break_point: Breakpoint configuration for debugging
pipeline_snapshot: Pipeline snapshot for resuming execution
Returns:
Dictionary containing outputs from all components
"""
class PredefinedPipeline:
@classmethod
def from_template(cls, template_name: str, **kwargs) -> Pipeline:
"""
Create a pipeline from a predefined template.
Args:
template_name: Name of the template to use
**kwargs: Template-specific configuration
Returns:
Configured Pipeline instance
"""Build custom components and integrate them into pipelines.
@component
def custom_component(input_param: str) -> Dict[str, Any]:
"""
Decorator to create a Haystack component from a function.
The decorated function becomes a runnable component that can be added to pipelines.
Args:
input_param: Example input parameter
Returns:
Dictionary with component outputs
"""
class Component:
"""Base class for all Haystack components."""
def run(self, **kwargs) -> Dict[str, Any]:
"""
Execute the component logic.
Args:
**kwargs: Component input parameters
Returns:
Dictionary containing component outputs
"""
def to_dict(self) -> Dict[str, Any]:
"""Serialize component to dictionary."""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Component":
"""Deserialize component from dictionary."""
class SuperComponent:
"""Advanced component base class with additional capabilities."""
def run(self, **kwargs) -> Dict[str, Any]:
"""Execute the super component logic."""
@super_component
def advanced_component() -> None:
"""Decorator for creating super components."""Core data structures that flow between components in pipelines.
class Document:
def __init__(
self,
id: str = "",
content: Optional[str] = None,
blob: Optional[ByteStream] = None,
meta: Dict[str, Any] = None,
score: Optional[float] = None,
embedding: Optional[List[float]] = None,
sparse_embedding: Optional[SparseEmbedding] = None
) -> None:
"""
Initialize a Document.
Args:
id: Unique document identifier
content: Text content of the document
blob: Binary data associated with the document
meta: Metadata dictionary
score: Relevance score (used in retrieval)
embedding: Vector embedding of the document
sparse_embedding: Sparse vector representation of the document
"""
id: str
content: Optional[str]
blob: Optional[ByteStream]
meta: Dict[str, Any]
score: Optional[float]
embedding: Optional[List[float]]
sparse_embedding: Optional[SparseEmbedding]
def to_dict(self) -> Dict[str, Any]:
"""Convert document to dictionary."""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Document":
"""Create document from dictionary."""
class Answer:
"""Protocol for answer types."""
query: str
data: str
meta: Dict[str, Any]
class GeneratedAnswer:
def __init__(
self,
data: str,
query: str = "",
documents: List[Document] = None,
meta: Dict[str, Any] = None
) -> None:
"""
Initialize a GeneratedAnswer.
Args:
data: Generated answer text
query: Original query
documents: Source documents used for generation
meta: Additional metadata
"""
data: str
query: str
documents: List[Document]
meta: Dict[str, Any]
class ExtractedAnswer:
def __init__(
self,
query: str,
score: Optional[float] = None,
data: str = "",
document: Optional[Document] = None,
context: Optional[str] = None,
offsets_in_document: List[Span] = None,
offsets_in_context: List[Span] = None,
meta: Dict[str, Any] = None
) -> None:
"""
Initialize an ExtractedAnswer.
Args:
query: Original query
score: Confidence score
data: Extracted answer text
document: Source document
context: Context around the answer
offsets_in_document: Character offsets in original document
offsets_in_context: Character offsets in context
meta: Additional metadata
"""
query: str
score: Optional[float]
data: str
document: Optional[Document]
context: Optional[str]
offsets_in_document: List[Span]
offsets_in_context: List[Span]
meta: Dict[str, Any]Handle serialization and deserialization of components and data structures.
def default_to_dict(obj: Any) -> Dict[str, Any]:
"""
Default serialization function for Haystack objects.
Args:
obj: Object to serialize
Returns:
Dictionary representation of the object
"""
def default_from_dict(data: Dict[str, Any]) -> Any:
"""
Default deserialization function for Haystack objects.
Args:
data: Dictionary to deserialize
Returns:
Deserialized object
"""Handle component and pipeline execution errors.
class ComponentError(Exception):
"""Exception raised when a component encounters an error."""
def __init__(self, message: str, component_name: str = None) -> None:
"""
Initialize ComponentError.
Args:
message: Error description
component_name: Name of the component that raised the error
"""
class DeserializationError(Exception):
"""Exception raised during object deserialization."""
def __init__(self, message: str, data: Dict[str, Any] = None) -> None:
"""
Initialize DeserializationError.
Args:
message: Error description
data: Data that failed to deserialize
"""from haystack import component
@component
def text_processor(text: str, operation: str = "upper") -> Dict[str, str]:
"""Process text with specified operation."""
if operation == "upper":
processed_text = text.upper()
elif operation == "lower":
processed_text = text.lower()
else:
processed_text = text
return {"processed_text": processed_text}
# Use in pipeline
from haystack import Pipeline
pipeline = Pipeline()
pipeline.add_component("processor", text_processor)
result = pipeline.run({"processor": {"text": "Hello World", "operation": "upper"}})
print(result["processor"]["processed_text"]) # "HELLO WORLD"from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
# Create pipeline
pipeline = Pipeline()
# Add components
pipeline.add_component("prompt_builder", PromptBuilder(template="Answer: {{question}}"))
pipeline.add_component("generator", OpenAIGenerator())
# Connect components
pipeline.connect("prompt_builder.prompt", "generator.prompt")
# Run pipeline
result = pipeline.run({
"prompt_builder": {"question": "What is Python?"}
})
print(result["generator"]["replies"][0])from typing import Protocol, Dict, Any, List, Optional, Set, Union
from dataclasses import dataclass
from pathlib import Path
class Span:
start: int
end: int
class ByteStream:
"""Binary data stream for document content."""
data: bytes
class SparseEmbedding:
"""Sparse vector representation with indices and values."""
indices: List[int]
values: List[float]
class Breakpoint:
"""Breakpoint configuration for pipeline debugging."""
pass
class AgentBreakpoint:
"""Agent-specific breakpoint configuration."""
pass
class PipelineSnapshot:
"""Pipeline execution snapshot for resuming."""
pass
@dataclass
class ComponentInfo:
name: str
type: str
inputs: Dict[str, Any]
outputs: Dict[str, Any]Install with Tessl CLI
npx tessl i tessl/pypi-haystack-ai