CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-haystack-ai

LLM framework to build customizable, production-ready LLM applications.

Pending
Overview
Eval results
Files

core-framework.mddocs/

Core Framework

Essential framework components for building pipelines, managing data flow, creating custom components, and handling serialization. The core framework provides the foundation for all Haystack functionality.

Capabilities

Pipeline Management

Create and orchestrate data flows between components using directed acyclic graphs (DAGs).

class Pipeline:
    def __init__(
        self,
        metadata: Optional[Dict[str, Any]] = None,
        max_runs_per_component: int = 100,
        connection_type_validation: bool = True
    ) -> None:
        """
        Initialize a new Pipeline.
        
        Args:
            metadata: Arbitrary dictionary to store metadata about this Pipeline
            max_runs_per_component: Maximum number of times a component can run in a single pipeline execution
            connection_type_validation: Whether the pipeline will validate the types of the connections
        """

    def add_component(self, name: str, instance: Any) -> None:
        """
        Add a component to the pipeline.
        
        Args:
            name: Unique name for the component
            instance: Component instance to add
        """

    def connect(self, sender: str, receiver: str) -> None:
        """
        Connect components in the pipeline.
        
        Args:
            sender: Output socket in format "component_name.output_name"
            receiver: Input socket in format "component_name.input_name"
        """

    def run(
        self,
        data: Dict[str, Any],
        include_outputs_from: Optional[Set[str]] = None,
        *,
        break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
        pipeline_snapshot: Optional[PipelineSnapshot] = None
    ) -> Dict[str, Any]:
        """
        Run the pipeline with given input data.
        
        Args:
            data: Input data for pipeline components
            include_outputs_from: Set of component names to include outputs from
            break_point: Breakpoint configuration for debugging
            pipeline_snapshot: Pipeline snapshot for resuming execution
            
        Returns:
            Dictionary containing outputs from all components
        """

    def draw(
        self,
        *,
        path: Path,
        server_url: str = "https://mermaid.ink",
        params: Optional[Dict] = None
    ) -> None:
        """
        Draw a visual representation of the pipeline.
        
        Args:
            path: File path to save the visualization (required)
            server_url: Mermaid server URL for rendering
            params: Additional parameters for the rendering
        """

class AsyncPipeline:
    def __init__(
        self,
        metadata: Optional[Dict[str, Any]] = None,
        max_runs_per_component: int = 100,
        connection_type_validation: bool = True
    ) -> None:
        """Initialize an asynchronous Pipeline."""

    def add_component(self, name: str, instance: Any) -> None:
        """Add a component to the async pipeline."""

    def connect(self, sender: str, receiver: str) -> None:
        """Connect components in the async pipeline."""

    async def run(
        self,
        data: Dict[str, Any],
        include_outputs_from: Optional[Set[str]] = None,
        *,
        break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
        pipeline_snapshot: Optional[PipelineSnapshot] = None
    ) -> Dict[str, Any]:
        """
        Run the async pipeline with given input data.
        
        Args:
            data: Input data for pipeline components
            include_outputs_from: Set of component names to include outputs from
            break_point: Breakpoint configuration for debugging
            pipeline_snapshot: Pipeline snapshot for resuming execution
            
        Returns:
            Dictionary containing outputs from all components
        """

class PredefinedPipeline:
    @classmethod
    def from_template(cls, template_name: str, **kwargs) -> Pipeline:
        """
        Create a pipeline from a predefined template.
        
        Args:
            template_name: Name of the template to use
            **kwargs: Template-specific configuration
            
        Returns:
            Configured Pipeline instance
        """

Component System

Build custom components and integrate them into pipelines.

@component
def custom_component(input_param: str) -> Dict[str, Any]:
    """
    Decorator to create a Haystack component from a function.
    
    The decorated function becomes a runnable component that can be added to pipelines.
    
    Args:
        input_param: Example input parameter
        
    Returns:
        Dictionary with component outputs
    """

class Component:
    """Base class for all Haystack components."""
    
    def run(self, **kwargs) -> Dict[str, Any]:
        """
        Execute the component logic.
        
        Args:
            **kwargs: Component input parameters
            
        Returns:
            Dictionary containing component outputs
        """

    def to_dict(self) -> Dict[str, Any]:
        """Serialize component to dictionary."""

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Component":
        """Deserialize component from dictionary."""

class SuperComponent:
    """Advanced component base class with additional capabilities."""
    
    def run(self, **kwargs) -> Dict[str, Any]:
        """Execute the super component logic."""

@super_component
def advanced_component() -> None:
    """Decorator for creating super components."""

Data Classes

Core data structures that flow between components in pipelines.

class Document:
    def __init__(
        self,
        id: str = "",
        content: Optional[str] = None,
        blob: Optional[ByteStream] = None,
        meta: Dict[str, Any] = None,
        score: Optional[float] = None,
        embedding: Optional[List[float]] = None,
        sparse_embedding: Optional[SparseEmbedding] = None
    ) -> None:
        """
        Initialize a Document.
        
        Args:
            id: Unique document identifier
            content: Text content of the document
            blob: Binary data associated with the document
            meta: Metadata dictionary
            score: Relevance score (used in retrieval)
            embedding: Vector embedding of the document
            sparse_embedding: Sparse vector representation of the document
        """
    
    id: str
    content: Optional[str]
    blob: Optional[ByteStream]
    meta: Dict[str, Any]
    score: Optional[float]
    embedding: Optional[List[float]]
    sparse_embedding: Optional[SparseEmbedding]

    def to_dict(self) -> Dict[str, Any]:
        """Convert document to dictionary."""

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Document":
        """Create document from dictionary."""

class Answer:
    """Protocol for answer types."""
    query: str
    data: str
    meta: Dict[str, Any]

class GeneratedAnswer:
    def __init__(
        self,
        data: str,
        query: str = "",
        documents: List[Document] = None,
        meta: Dict[str, Any] = None
    ) -> None:
        """
        Initialize a GeneratedAnswer.
        
        Args:
            data: Generated answer text
            query: Original query
            documents: Source documents used for generation
            meta: Additional metadata
        """
    
    data: str
    query: str
    documents: List[Document]
    meta: Dict[str, Any]

class ExtractedAnswer:
    def __init__(
        self,
        query: str,
        score: Optional[float] = None,
        data: str = "",
        document: Optional[Document] = None,
        context: Optional[str] = None,
        offsets_in_document: List[Span] = None,
        offsets_in_context: List[Span] = None,
        meta: Dict[str, Any] = None
    ) -> None:
        """
        Initialize an ExtractedAnswer.
        
        Args:
            query: Original query
            score: Confidence score
            data: Extracted answer text
            document: Source document
            context: Context around the answer
            offsets_in_document: Character offsets in original document
            offsets_in_context: Character offsets in context
            meta: Additional metadata
        """
    
    query: str
    score: Optional[float]
    data: str
    document: Optional[Document]
    context: Optional[str]
    offsets_in_document: List[Span]
    offsets_in_context: List[Span]
    meta: Dict[str, Any]

Serialization

Handle serialization and deserialization of components and data structures.

def default_to_dict(obj: Any) -> Dict[str, Any]:
    """
    Default serialization function for Haystack objects.
    
    Args:
        obj: Object to serialize
        
    Returns:
        Dictionary representation of the object
    """

def default_from_dict(data: Dict[str, Any]) -> Any:
    """
    Default deserialization function for Haystack objects.
    
    Args:
        data: Dictionary to deserialize
        
    Returns:
        Deserialized object
    """

Error Handling

Handle component and pipeline execution errors.

class ComponentError(Exception):
    """Exception raised when a component encounters an error."""
    
    def __init__(self, message: str, component_name: str = None) -> None:
        """
        Initialize ComponentError.
        
        Args:
            message: Error description
            component_name: Name of the component that raised the error
        """

class DeserializationError(Exception):
    """Exception raised during object deserialization."""
    
    def __init__(self, message: str, data: Dict[str, Any] = None) -> None:
        """
        Initialize DeserializationError.
        
        Args:
            message: Error description
            data: Data that failed to deserialize
        """

Usage Examples

Creating a Custom Component

from haystack import component

@component
def text_processor(text: str, operation: str = "upper") -> Dict[str, str]:
    """Process text with specified operation."""
    if operation == "upper":
        processed_text = text.upper()
    elif operation == "lower":
        processed_text = text.lower()
    else:
        processed_text = text
    
    return {"processed_text": processed_text}

# Use in pipeline
from haystack import Pipeline

pipeline = Pipeline()
pipeline.add_component("processor", text_processor)

result = pipeline.run({"processor": {"text": "Hello World", "operation": "upper"}})
print(result["processor"]["processed_text"])  # "HELLO WORLD"

Building a Simple Pipeline

from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

# Create pipeline
pipeline = Pipeline()

# Add components
pipeline.add_component("prompt_builder", PromptBuilder(template="Answer: {{question}}"))
pipeline.add_component("generator", OpenAIGenerator())

# Connect components
pipeline.connect("prompt_builder.prompt", "generator.prompt")

# Run pipeline
result = pipeline.run({
    "prompt_builder": {"question": "What is Python?"}
})

print(result["generator"]["replies"][0])

Types

from typing import Protocol, Dict, Any, List, Optional, Set, Union
from dataclasses import dataclass
from pathlib import Path

class Span:
    start: int
    end: int

class ByteStream:
    """Binary data stream for document content."""
    data: bytes

class SparseEmbedding:
    """Sparse vector representation with indices and values."""
    indices: List[int]
    values: List[float]

class Breakpoint:
    """Breakpoint configuration for pipeline debugging."""
    pass

class AgentBreakpoint:
    """Agent-specific breakpoint configuration."""
    pass

class PipelineSnapshot:
    """Pipeline execution snapshot for resuming."""
    pass

@dataclass
class ComponentInfo:
    name: str
    type: str
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-haystack-ai

docs

agent-framework.md

core-framework.md

document-processing.md

document-stores.md

evaluation.md

index.md

prompt-building.md

retrieval.md

text-embeddings.md

text-generation.md

tile.json