LLM framework to build customizable, production-ready LLM applications.
—
Build autonomous agents that can use tools, maintain conversation state, and execute complex multi-step workflows. Haystack's agent framework provides components for creating intelligent agents with function calling capabilities and persistent state management.
Create intelligent agents that can reason, make decisions, and execute actions using available tools.
class Agent:
def __init__(
self,
generator: Any,
tools: Optional[List[Tool]] = None,
toolset: Optional[Toolset] = None,
system_prompt: Optional[str] = None,
max_iterations: int = 10
) -> None:
"""
Initialize an autonomous agent.
Args:
generator: Chat generator component (e.g., OpenAIChatGenerator)
tools: List of individual tools available to the agent
toolset: Organized collection of tools
system_prompt: System instructions for the agent
max_iterations: Maximum number of reasoning iterations
"""
def run(
self,
messages: List[ChatMessage]
) -> Dict[str, List[ChatMessage]]:
"""
Run agent conversation with tool execution capabilities.
Args:
messages: List of chat messages (conversation history)
Returns:
Dictionary with 'messages' key containing updated conversation
"""Execute tools and functions called by language models with proper error handling and result formatting.
class ToolInvoker:
def __init__(
self,
tools: Optional[List[Tool]] = None,
toolset: Optional[Toolset] = None,
raise_on_failure: bool = True
) -> None:
"""
Initialize tool invoker for executing function calls.
Args:
tools: List of tools available for invocation
toolset: Organized toolset for invocation
raise_on_failure: Whether to raise exceptions on tool failures
"""
def run(
self,
tool_calls: List[ToolCall]
) -> Dict[str, List[ToolCallResult]]:
"""
Execute tool calls and return results.
Args:
tool_calls: List of tool calls to execute
Returns:
Dictionary with 'tool_results' key containing execution results
"""Define, organize, and manage tools that agents can use.
class Tool:
def __init__(
self,
name: str,
description: str,
function: Callable,
parameters: Dict[str, Any]
) -> None:
"""
Initialize a tool.
Args:
name: Unique tool name
description: Description of what the tool does
function: Python function to execute
parameters: JSON schema for function parameters
"""
@classmethod
def from_function(
cls,
function: Callable,
name: Optional[str] = None,
description: Optional[str] = None
) -> "Tool":
"""
Create tool from a Python function.
Args:
function: Python function to wrap as a tool
name: Optional custom name (defaults to function name)
description: Optional custom description (defaults to docstring)
Returns:
Tool instance wrapping the function
"""
def invoke(self, **kwargs) -> Any:
"""
Invoke the tool with given arguments.
Args:
**kwargs: Arguments to pass to the tool function
Returns:
Result of tool execution
"""
@tool
def my_custom_tool(param1: str, param2: int = 10) -> str:
"""
Decorator to create a tool from a function.
Args:
param1: First parameter description
param2: Second parameter with default value
Returns:
Tool execution result
"""
class Toolset:
def __init__(
self,
tools: List[Tool]
) -> None:
"""
Initialize a collection of tools.
Args:
tools: List of tools to include in the toolset
"""
def add_tool(self, tool: Tool) -> None:
"""Add a tool to the toolset."""
def get_tool(self, name: str) -> Optional[Tool]:
"""Get a tool by name."""
def list_tools(self) -> List[str]:
"""List all tool names."""Manage agent state and conversation context across multiple interactions.
class State:
def __init__(
self,
data: Optional[Dict[str, Any]] = None
) -> None:
"""
Initialize agent state.
Args:
data: Initial state data dictionary
"""
def get(self, key: str, default: Any = None) -> Any:
"""
Get value from state.
Args:
key: State key to retrieve
default: Default value if key not found
Returns:
State value or default
"""
def set(self, key: str, value: Any) -> None:
"""
Set state value.
Args:
key: State key to set
value: Value to store
"""
def update(self, data: Dict[str, Any]) -> None:
"""
Update state with multiple key-value pairs.
Args:
data: Dictionary of state updates
"""
def clear(self) -> None:
"""Clear all state data."""
def to_dict(self) -> Dict[str, Any]:
"""Convert state to dictionary."""from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.tools import Tool
from haystack.dataclasses import ChatMessage, ChatRole
from haystack.utils import Secret
import requests
from datetime import datetime
# Define tools for the agent
@tool
def get_weather(city: str) -> str:
"""
Get current weather for a city.
Args:
city: Name of the city
Returns:
Weather description
"""
# Simulate weather API call
return f"Weather in {city}: Sunny, 22°C"
@tool
def get_current_time() -> str:
"""
Get current date and time.
Returns:
Current timestamp
"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Create tools list
tools = [get_weather, get_current_time]
# Initialize chat generator
chat_generator = OpenAIChatGenerator(
api_key=Secret.from_env_var("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
tools=tools
)
# Create agent
agent = Agent(
generator=chat_generator,
tools=tools,
system_prompt="You are a helpful assistant that can check weather and time."
)
# Run agent conversation
messages = [
ChatMessage(
content="What's the weather like in Paris and what time is it now?",
role=ChatRole.USER
)
]
result = agent.run(messages=messages)
for message in result["messages"]:
print(f"{message.role.value}: {message.content}")from haystack.tools import Toolset
import json
import sqlite3
# Define a more complex toolset
@tool
def search_database(query: str, table: str) -> str:
"""
Search database for information.
Args:
query: SQL-like search query
table: Database table to search
Returns:
JSON string with search results
"""
# Simulate database search
results = [
{"id": 1, "name": "John Doe", "email": "john@example.com"},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com"}
]
return json.dumps(results)
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""
Send an email.
Args:
to: Recipient email address
subject: Email subject
body: Email body content
Returns:
Confirmation message
"""
# Simulate email sending
return f"Email sent to {to} with subject '{subject}'"
@tool
def calculate_math(expression: str) -> str:
"""
Calculate mathematical expressions.
Args:
expression: Mathematical expression to evaluate
Returns:
Calculation result
"""
try:
result = eval(expression) # In production, use safe evaluation
return str(result)
except Exception as e:
return f"Error: {str(e)}"
# Create toolset
business_toolset = Toolset([
search_database,
send_email,
calculate_math
])
# Create business agent
business_agent = Agent(
generator=OpenAIChatGenerator(
api_key=Secret.from_env_var("OPENAI_API_KEY"),
model="gpt-4",
tools=business_toolset.tools
),
toolset=business_toolset,
system_prompt="""
You are a business assistant that can:
1. Search the company database
2. Send emails to contacts
3. Perform calculations
Always be professional and helpful.
"""
)
# Complex multi-step task
messages = [
ChatMessage(
content="Find John Doe in the database, calculate 15% of 1000, and send him an email about the discount.",
role=ChatRole.USER
)
]
result = business_agent.run(messages=messages)from haystack.components.agents import State
# Create stateful agent
class StatefulAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.state = State()
self.conversation_history = []
def chat(self, user_input: str) -> str:
# Add user input to conversation
user_message = ChatMessage(content=user_input, role=ChatRole.USER)
self.conversation_history.append(user_message)
# Update state with conversation count
conversation_count = self.state.get("conversation_count", 0) + 1
self.state.set("conversation_count", conversation_count)
# Add context from state
context_message = ChatMessage(
content=f"This is conversation #{conversation_count}. "
f"Previous context: {self.state.to_dict()}",
role=ChatRole.SYSTEM
)
# Run agent with full context
messages_with_context = [context_message] + self.conversation_history
result = self.agent.run(messages=messages_with_context)
# Extract and store assistant response
assistant_response = result["messages"][-1]
self.conversation_history.append(assistant_response)
# Update state with last response summary
self.state.set("last_topic", user_input[:50])
return assistant_response.content
# Usage
stateful_agent = StatefulAgent(agent)
response1 = stateful_agent.chat("My name is Alice")
response2 = stateful_agent.chat("What's my name?") # Should remember Alice
response3 = stateful_agent.chat("What did we talk about before?") # Should reference historyfrom haystack.components.tools import ToolInvoker
from haystack import Pipeline
from haystack.dataclasses import ToolCall
# Create tool invocation pipeline
tool_pipeline = Pipeline()
# Add tool invoker
tool_invoker = ToolInvoker(
tools=[get_weather, get_current_time],
raise_on_failure=False
)
tool_pipeline.add_component("tool_invoker", tool_invoker)
# Create tool calls
tool_calls = [
ToolCall(
tool_name="get_weather",
arguments={"city": "London"},
id="call_1"
),
ToolCall(
tool_name="get_current_time",
arguments={},
id="call_2"
)
]
# Execute tools
result = tool_pipeline.run({
"tool_invoker": {"tool_calls": tool_calls}
})
# Process results
for tool_result in result["tool_invoker"]["tool_results"]:
print(f"Tool {tool_result.origin.tool_name}: {tool_result.result}")
if tool_result.error:
print(f"Error occurred: {tool_result.error}")@tool
def risky_operation(value: int) -> str:
"""
Perform an operation that might fail.
Args:
value: Input value
Returns:
Operation result or error message
"""
if value < 0:
raise ValueError("Value must be positive")
return f"Success: {value * 2}"
# Create agent with error handling
class RobustAgent:
def __init__(self, agent: Agent):
self.agent = agent
self.error_count = 0
def safe_run(self, messages: List[ChatMessage]) -> Dict[str, Any]:
try:
result = self.agent.run(messages=messages)
return {"success": True, "result": result}
except Exception as e:
self.error_count += 1
return {
"success": False,
"error": str(e),
"error_count": self.error_count
}
# Usage with error handling
robust_agent = RobustAgent(
Agent(
generator=chat_generator,
tools=[risky_operation],
system_prompt="Handle errors gracefully and inform the user."
)
)
messages = [
ChatMessage(
content="Try risky_operation with value -5",
role=ChatRole.USER
)
]
result = robust_agent.safe_run(messages)
if result["success"]:
print("Agent succeeded:", result["result"])
else:
print("Agent failed:", result["error"])# Create workflow agent
class WorkflowAgent:
def __init__(self):
self.steps = []
self.current_step = 0
self.context = {}
def add_step(self, step_name: str, agent: Agent):
self.steps.append({"name": step_name, "agent": agent})
def execute_workflow(self, initial_input: str) -> Dict[str, Any]:
results = {}
current_input = initial_input
for i, step in enumerate(self.steps):
print(f"Executing step {i+1}: {step['name']}")
messages = [
ChatMessage(
content=f"Context: {self.context}\nTask: {current_input}",
role=ChatRole.USER
)
]
step_result = step["agent"].run(messages=messages)
results[step["name"]] = step_result
# Extract output for next step
assistant_message = step_result["messages"][-1]
current_input = assistant_message.content
# Update context
self.context[step["name"]] = {
"input": messages[0].content,
"output": current_input
}
return results
# Create workflow
workflow = WorkflowAgent()
# Add workflow steps
workflow.add_step("research", Agent(
generator=chat_generator,
tools=[search_database],
system_prompt="Research the given topic and provide key information."
))
workflow.add_step("analysis", Agent(
generator=chat_generator,
tools=[calculate_math],
system_prompt="Analyze the research data and provide insights."
))
workflow.add_step("communication", Agent(
generator=chat_generator,
tools=[send_email],
system_prompt="Create a professional communication based on the analysis."
))
# Execute workflow
workflow_results = workflow.execute_workflow("Analyze customer satisfaction data")# Create multiple specialized agents
class MultiAgentSystem:
def __init__(self):
self.agents = {}
def add_agent(self, name: str, agent: Agent):
self.agents[name] = agent
def facilitate_conversation(self, topic: str, max_turns: int = 5) -> List[Dict[str, str]]:
conversation = []
current_speaker = list(self.agents.keys())[0]
current_message = f"Let's discuss: {topic}"
for turn in range(max_turns):
# Get current agent
agent = self.agents[current_speaker]
# Add conversation history to context
context = "\n".join([
f"{entry['speaker']}: {entry['message']}"
for entry in conversation[-3:] # Last 3 exchanges
])
messages = [
ChatMessage(
content=f"Context: {context}\n\nRespond to: {current_message}",
role=ChatRole.USER
)
]
result = agent.run(messages=messages)
response = result["messages"][-1].content
conversation.append({
"speaker": current_speaker,
"message": response,
"turn": turn + 1
})
# Switch to next agent
agent_names = list(self.agents.keys())
current_index = agent_names.index(current_speaker)
current_speaker = agent_names[(current_index + 1) % len(agent_names)]
current_message = response
return conversation
# Create multi-agent system
mas = MultiAgentSystem()
mas.add_agent("researcher", Agent(
generator=chat_generator,
system_prompt="You are a researcher. Focus on facts and data."
))
mas.add_agent("critic", Agent(
generator=chat_generator,
system_prompt="You are a critic. Challenge assumptions and ask hard questions."
))
mas.add_agent("synthesizer", Agent(
generator=chat_generator,
system_prompt="You synthesize different viewpoints into coherent conclusions."
))
# Run multi-agent conversation
conversation = mas.facilitate_conversation("The impact of AI on education", max_turns=6)
for entry in conversation:
print(f"{entry['speaker']} (Turn {entry['turn']}): {entry['message']}")from typing import Callable, Dict, Any, List, Optional, Union
from haystack.dataclasses import ChatMessage, ToolCall, ToolCallResult
class AgentStep:
"""Represents a single step in agent reasoning."""
thought: str
action: Optional[ToolCall]
observation: Optional[str]
class AgentTrace:
"""Complete trace of agent execution."""
steps: List[AgentStep]
final_answer: str
total_time: float
class ToolDefinition:
"""Tool definition for function calling."""
name: str
description: str
parameters: Dict[str, Any]
function: CallableInstall with Tessl CLI
npx tessl i tessl/pypi-haystack-ai