Step-based workflow orchestration with control flow.
from agno.workflow import Workflow, Step, Steps, Loop, Parallel, Condition
class Workflow:
def __init__(
self,
*,
name: Optional[str] = None,
description: Optional[str] = None,
steps: Optional[List[Step]] = None,
db: Optional[Union[BaseDb, AsyncBaseDb]] = None,
**kwargs
): ...class Step:
def __init__(
self,
agent: Optional[Agent] = None,
team: Optional[Team] = None,
executor: Optional[Union[Agent, Team]] = None,
*,
name: Optional[str] = None,
condition: Optional[Condition] = None,
**kwargs
): ...
class Steps:
"""Sequential steps container"""
def __init__(self, steps: List[Step], **kwargs): ...
class Loop:
"""Loop construct for iterations"""
def __init__(
self,
steps: List[Step],
*,
max_iterations: int = 10,
condition: Optional[Callable] = None,
**kwargs
): ...
class Parallel:
"""Parallel execution of steps"""
def __init__(self, steps: List[Step], **kwargs): ...
class Condition:
"""Conditional branching"""
def __init__(
self,
condition: Callable,
if_true: Optional[Step] = None,
if_false: Optional[Step] = None,
**kwargs
): ...
class WorkflowAgent:
"""Agent with workflow execution capabilities"""
def __init__(
self,
workflow: Workflow,
*,
name: Optional[str] = None,
description: Optional[str] = None,
**kwargs
): ...
def run(
self,
input: Union[str, List, Dict, Message, BaseModel],
**kwargs
) -> Union[RunOutput, Iterator[RunOutputEvent]]:
"""Execute the workflow."""
async def arun(
self,
input: Union[str, List, Dict, Message, BaseModel],
**kwargs
) -> Union[RunOutput, AsyncIterator[RunOutputEvent]]:
"""Async: Execute the workflow."""
class Router:
"""Router for conditional routing between steps"""
def __init__(
self,
routes: Dict[str, Step],
default: Optional[Step] = None,
**kwargs
): ...from agno.agent import Agent
from agno.workflow import Workflow, Step
from agno.models.openai import OpenAIChat
# Create agents for each step
researcher = Agent(
name="Researcher",
model=OpenAIChat(id="gpt-4")
)
analyst = Agent(
name="Analyst",
model=OpenAIChat(id="gpt-4")
)
writer = Agent(
name="Writer",
model=OpenAIChat(id="gpt-4")
)
# Create workflow
workflow = Workflow(
name="Research Pipeline",
steps=[
Step(agent=researcher, name="research"),
Step(agent=analyst, name="analyze"),
Step(agent=writer, name="write")
]
)
result = workflow.run("Research AI safety")from agno.workflow import Workflow, Step, Condition
def needs_more_research(output):
return "insufficient" in output.content.lower()
workflow = Workflow(
steps=[
Step(agent=researcher, name="initial_research"),
Step(
agent=analyst,
name="analysis",
condition=Condition(
condition=needs_more_research,
if_true=Step(agent=researcher, name="deep_research"),
if_false=None
)
)
]
)from agno.workflow import Workflow, Parallel, Step
workflow = Workflow(
steps=[
Parallel(steps=[
Step(agent=web_researcher, name="web_search"),
Step(agent=paper_researcher, name="paper_search"),
Step(agent=news_researcher, name="news_search")
]),
Step(agent=synthesizer, name="synthesize")
]
)