Comprehensive agent creation, execution, and management functionality.
Create AI agents with flexible configuration for models, tools, memory, knowledge, and more.
class Agent:
def __init__(
self,
*,
# Core settings
model: Optional[Union[Model, str]] = None,
name: Optional[str] = None,
description: Optional[str] = None,
instructions: Optional[Union[str, List[str], Callable]] = None,
# Tools
tools: Optional[Sequence[Union[Toolkit, Callable, Function, Dict]]] = None,
tool_call_limit: Optional[int] = None,
# Storage & Memory
db: Optional[Union[BaseDb, AsyncBaseDb]] = None,
memory_manager: Optional[MemoryManager] = None,
enable_user_memories: bool = False,
add_memories_to_context: Optional[bool] = None,
# Knowledge & RAG
knowledge: Optional[Knowledge] = None,
add_knowledge_to_context: bool = False,
knowledge_filters: Optional[Union[Dict[str, Any], List[FilterExpr]]] = None,
# Session Management
session_id: Optional[str] = None,
user_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
add_session_state_to_context: bool = False,
# History
add_history_to_context: bool = False,
num_history_runs: Optional[int] = None,
num_history_messages: Optional[int] = None,
# Structured Output
output_schema: Optional[Type[BaseModel]] = None,
parse_response: bool = True,
# Streaming
stream: Optional[bool] = None,
stream_events: Optional[bool] = None,
# Reasoning
reasoning: bool = False,
reasoning_model: Optional[Union[Model, str]] = None,
# Hooks & Guardrails
pre_hooks: Optional[List[Union[Callable, BaseGuardrail]]] = None,
post_hooks: Optional[List[Union[Callable, BaseGuardrail]]] = None,
# Debug
debug_mode: bool = False,
**kwargs
): ...Execute agents synchronously with optional streaming.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
images: Optional[Sequence[Image]] = None,
audio: Optional[Sequence[Audio]] = None,
videos: Optional[Sequence[Video]] = None,
files: Optional[Sequence[File]] = None,
output_schema: Optional[Type[BaseModel]] = None,
**kwargs
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
Execute the agent with the given input.
Parameters:
input: Text, messages, or structured input
stream: Enable streaming responses
session_id: Session identifier for conversation continuity
user_id: User identifier
images: Image inputs
audio: Audio inputs
videos: Video inputs
files: File inputs
output_schema: Pydantic model for structured output
**kwargs: Additional parameters
Returns:
RunOutput: Complete response if stream=False
Iterator: Stream of events if stream=True
"""Execute agents asynchronously for high-performance workloads.
async def arun(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
**kwargs
) -> Union[RunOutput, AsyncIterator[RunOutputEvent]]:
"""
Async version of run().
Parameters:
input: Text, messages, or structured input
stream: Enable streaming responses
session_id: Session identifier
user_id: User identifier
**kwargs: Additional parameters
Returns:
RunOutput: Complete response if stream=False
AsyncIterator: Async stream of events if stream=True
"""Continue paused runs that require human input, confirmation, or external execution.
def continue_run(
self,
run_id: str,
input: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
*,
requirements: Optional[List[RunRequirement]] = None,
stream: Optional[bool] = None,
**kwargs
) -> Union[RunOutput, Iterator[RunOutputEvent]]:
"""
Continue a paused run after fulfilling requirements.
Parameters:
run_id: ID of run to continue
input: Optional input for continuation
requirements: Fulfilled requirements
stream: Enable streaming
**kwargs: Additional parameters
Returns:
RunOutput or Iterator: Continuation response
"""
async def acontinue_run(
self,
run_id: str,
input: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
*,
requirements: Optional[List[RunRequirement]] = None,
stream: Optional[bool] = None,
**kwargs
) -> Union[RunOutput, AsyncIterator[RunOutputEvent]]:
"""Async version of continue_run()."""Add and manage tools available to the agent.
def add_tool(self, tool: Union[Toolkit, Callable, Function, Dict]) -> None:
"""Add a single tool to the agent."""
def set_tools(self, tools: Sequence[Union[Toolkit, Callable, Function, Dict]]) -> None:
"""Replace all tools with a new list."""Load, save, and manage agent sessions for conversation continuity.
def get_session(self, session_id: Optional[str] = None) -> Optional[AgentSession]:
"""Load an agent session from storage."""
async def aget_session(self, session_id: Optional[str] = None) -> Optional[AgentSession]:
"""Async: Load an agent session from storage."""
def save_session(self, session: AgentSession) -> None:
"""Save the agent session to storage."""
async def asave_session(self, session: AgentSession) -> None:
"""Async: Save the agent session to storage."""
def delete_session(self, session_id: str) -> None:
"""Delete a session from storage."""
async def adelete_session(self, session_id: str) -> None:
"""Async: Delete a session from storage."""
def rename(self, name: str, session_id: Optional[str] = None) -> None:
"""Rename the agent and save to storage."""Get and update session state for maintaining context across runs.
def get_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Get the session state."""
async def aget_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Async: Get the session state."""
def update_session_state(
self,
session_state_updates: Dict[str, Any],
session_id: Optional[str] = None
) -> str:
"""
Update the session state.
Parameters:
session_state_updates: Dictionary of updates
session_id: Session to update
Returns:
str: Confirmation message
"""
async def aupdate_session_state(
self,
session_state_updates: Dict[str, Any],
session_id: Optional[str] = None
) -> str:
"""Async: Update the session state."""Retrieve conversation history and messages from sessions.
def get_session_messages(
self,
session_id: Optional[str] = None,
last_n_runs: Optional[int] = None,
limit: Optional[int] = None,
skip_roles: Optional[List[str]] = None,
skip_statuses: Optional[List[RunStatus]] = None,
) -> List[Message]:
"""
Get all messages from a session with filtering options.
Parameters:
session_id: Session identifier
last_n_runs: Number of recent runs to include
limit: Maximum messages to return
skip_roles: Skip messages with these roles
skip_statuses: Skip messages with these statuses
Returns:
List[Message]: Filtered messages
"""
async def aget_session_messages(...) -> List[Message]:
"""Async: Get all messages from a session."""
def get_chat_history(
self,
session_id: Optional[str] = None,
last_n_runs: Optional[int] = None
) -> List[Message]:
"""Get user and assistant messages (chat history)."""
async def aget_chat_history(...) -> List[Message]:
"""Async: Get chat history."""Retrieve previous run outputs from storage.
def get_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[RunOutput]:
"""Get a specific run output by ID."""
async def aget_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[RunOutput]:
"""Async: Get a specific run output by ID."""
def get_last_run_output(self, session_id: Optional[str] = None) -> Optional[RunOutput]:
"""Get the most recent run output."""
async def aget_last_run_output(self, session_id: Optional[str] = None) -> Optional[RunOutput]:
"""Async: Get the most recent run output."""Access user memories for contextual conversations.
def get_user_memories(self, user_id: Optional[str] = None) -> Optional[List[UserMemory]]:
"""Get all memories for a user."""
async def aget_user_memories(self, user_id: Optional[str] = None) -> Optional[List[UserMemory]]:
"""Async: Get all memories for a user."""Search the knowledge base for relevant information.
def get_relevant_docs_from_knowledge(
self,
query: str,
num_documents: Optional[int] = None,
filters: Optional[Union[Dict[str, Any], List[FilterExpr]]] = None,
**kwargs
) -> Optional[List[Union[Dict[str, Any], str]]]:
"""
Search knowledge base for relevant documents.
Parameters:
query: Search query
num_documents: Number of documents to return
filters: Filters for search
**kwargs: Additional parameters
Returns:
List of relevant documents
"""
async def aget_relevant_docs_from_knowledge(...):
"""Async: Search knowledge base."""Interactive command-line and display utilities.
def print_response(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
markdown: Optional[bool] = None,
show_message: bool = True,
**kwargs
) -> None:
"""Run the agent and print the response to console."""
async def aprint_response(...) -> None:
"""Async: Run the agent and print the response."""
def cli_app(
self,
input: Optional[str] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
stream: bool = False,
markdown: bool = False,
**kwargs
) -> None:
"""Run an interactive CLI to chat with the agent."""
async def acli_app(...) -> None:
"""Async: Run an interactive CLI to chat with the agent."""Additional utility methods.
def deep_copy(self, *, update: Optional[Dict[str, Any]] = None) -> Agent:
"""
Create a deep copy of the agent with optional updates.
Parameters:
update: Dictionary of fields to update
Returns:
Agent: New agent instance
"""
@staticmethod
def cancel_run(run_id: str) -> bool:
"""
Cancel a running agent execution.
Parameters:
run_id: ID of run to cancel
Returns:
bool: True if cancelled, False if not found
"""@dataclass
class RunOutput:
"""Response returned by Agent.run()"""
run_id: Optional[str]
agent_id: Optional[str]
session_id: Optional[str]
user_id: Optional[str]
content: Optional[Any] # Main response content
content_type: str # Type of content
messages: Optional[List[Message]] # All messages in conversation
metrics: Optional[Metrics] # Performance metrics
# Media outputs
images: Optional[List[Image]]
audio: Optional[List[Audio]]
videos: Optional[List[Video]]
files: Optional[List[File]]
# State and metadata
session_state: Optional[Dict[str, Any]]
metadata: Optional[Dict[str, Any]]
# Status and control
status: RunStatus # running, completed, paused, cancelled
requirements: Optional[List[RunRequirement]] # HITL requirements
# Properties
@property
def is_paused(self) -> bool: ...
@property
def is_cancelled(self) -> bool: ...# Union of all event types
RunOutputEvent = Union[
RunStartedEvent,
RunContentEvent,
RunCompletedEvent,
RunErrorEvent,
RunPausedEvent,
ToolCallStartedEvent,
ToolCallCompletedEvent,
# ... more event types
]
# Key event for streaming content
@dataclass
class RunContentEvent:
event: str # "RunContent"
content: Optional[Any] # Streaming content
content_type: strclass Message(BaseModel):
"""Message in the conversation"""
role: str # "system", "user", "assistant", "tool"
content: Optional[Union[List[Any], str]]
# Tool-related
tool_calls: Optional[List[Dict[str, Any]]]
tool_call_id: Optional[str]
# Media
images: Optional[Sequence[Image]]
audio: Optional[Sequence[Audio]]
videos: Optional[Sequence[Video]]
files: Optional[Sequence[File]]@dataclass
class AgentSession:
"""Agent session stored in database"""
session_id: str
agent_id: Optional[str]
user_id: Optional[str]
session_data: Optional[Dict[str, Any]] # Session state and metadata
runs: Optional[List[RunOutput]] # All runs in session
summary: Optional[SessionSummary] # Session summary
created_at: Optional[int]
updated_at: Optional[int]from agno.agent import Agent
from agno.models.openai import OpenAIChat
agent = Agent(
name="Assistant",
model=OpenAIChat(id="gpt-4"),
description="A helpful AI assistant"
)
response = agent.run("What is machine learning?")
print(response.content)from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.duckduckgo import DuckDuckGoTools
agent = Agent(
name="Research Assistant",
model=OpenAIChat(id="gpt-4"),
tools=[DuckDuckGoTools()],
instructions=["Search the web for current information"]
)
response = agent.run("What are the latest developments in AI?")
print(response.content)from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.db.sqlite import SqliteDb
from agno.knowledge import Knowledge
from agno.vectordb.pgvector import PgVector
from agno.knowledge.embedder import OpenAIEmbedder
agent = Agent(
name="Knowledge Agent",
model=OpenAIChat(id="gpt-4"),
db=SqliteDb(db_file="agent.db"),
knowledge=Knowledge(
vector_db=PgVector(
table_name="documents",
db_url="postgresql://user:pass@localhost:5432/db"
),
embedder=OpenAIEmbedder()
),
add_history_to_context=True,
add_knowledge_to_context=True,
enable_user_memories=True,
)
# First conversation
agent.run("My favorite color is blue", session_id="user-123")
# Later conversation - agent remembers
response = agent.run("What is my favorite color?", session_id="user-123")from agno.agent import Agent
from agno.models.openai import OpenAIChat
agent = Agent(
name="Streaming Assistant",
model=OpenAIChat(id="gpt-4"),
stream=True
)
for chunk in agent.run("Write a short story"):
if hasattr(chunk, 'content') and chunk.content:
print(chunk.content, end="", flush=True)from agno.agent import Agent
from agno.models.openai import OpenAIChat
from pydantic import BaseModel
class MovieReview(BaseModel):
title: str
rating: int
summary: str
agent = Agent(
name="Reviewer",
model=OpenAIChat(id="gpt-4"),
output_schema=MovieReview
)
response = agent.run("Review the movie Inception")
review: MovieReview = response.content
print(f"{review.title}: {review.rating}/10")