or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

agent.mdagentos.mdeval.mdexceptions.mdguardrails.mdindex.mdknowledge.mdmedia.mdmemory.mdmodels.mdsessions.mdstorage.mdteam.mdtools.mdworkflow.md
tile.json

workflow.mddocs/

Workflow API

Step-based workflow orchestration with control flow.

Capabilities

Workflow Creation

from agno.workflow import Workflow, Step, Steps, Loop, Parallel, Condition

class Workflow:
    def __init__(
        self,
        *,
        name: Optional[str] = None,
        description: Optional[str] = None,
        steps: Optional[List[Step]] = None,
        db: Optional[Union[BaseDb, AsyncBaseDb]] = None,
        **kwargs
    ): ...

Step Control

class Step:
    def __init__(
        self,
        agent: Optional[Agent] = None,
        team: Optional[Team] = None,
        executor: Optional[Union[Agent, Team]] = None,
        *,
        name: Optional[str] = None,
        condition: Optional[Condition] = None,
        **kwargs
    ): ...

class Steps:
    """Sequential steps container"""
    def __init__(self, steps: List[Step], **kwargs): ...

class Loop:
    """Loop construct for iterations"""
    def __init__(
        self,
        steps: List[Step],
        *,
        max_iterations: int = 10,
        condition: Optional[Callable] = None,
        **kwargs
    ): ...

class Parallel:
    """Parallel execution of steps"""
    def __init__(self, steps: List[Step], **kwargs): ...

class Condition:
    """Conditional branching"""
    def __init__(
        self,
        condition: Callable,
        if_true: Optional[Step] = None,
        if_false: Optional[Step] = None,
        **kwargs
    ): ...

class WorkflowAgent:
    """Agent with workflow execution capabilities"""
    def __init__(
        self,
        workflow: Workflow,
        *,
        name: Optional[str] = None,
        description: Optional[str] = None,
        **kwargs
    ): ...

    def run(
        self,
        input: Union[str, List, Dict, Message, BaseModel],
        **kwargs
    ) -> Union[RunOutput, Iterator[RunOutputEvent]]:
        """Execute the workflow."""

    async def arun(
        self,
        input: Union[str, List, Dict, Message, BaseModel],
        **kwargs
    ) -> Union[RunOutput, AsyncIterator[RunOutputEvent]]:
        """Async: Execute the workflow."""

class Router:
    """Router for conditional routing between steps"""
    def __init__(
        self,
        routes: Dict[str, Step],
        default: Optional[Step] = None,
        **kwargs
    ): ...

Usage Examples

Basic Workflow

from agno.agent import Agent
from agno.workflow import Workflow, Step
from agno.models.openai import OpenAIChat

# Create agents for each step
researcher = Agent(
    name="Researcher",
    model=OpenAIChat(id="gpt-4")
)

analyst = Agent(
    name="Analyst",
    model=OpenAIChat(id="gpt-4")
)

writer = Agent(
    name="Writer",
    model=OpenAIChat(id="gpt-4")
)

# Create workflow
workflow = Workflow(
    name="Research Pipeline",
    steps=[
        Step(agent=researcher, name="research"),
        Step(agent=analyst, name="analyze"),
        Step(agent=writer, name="write")
    ]
)

result = workflow.run("Research AI safety")

Conditional Workflow

from agno.workflow import Workflow, Step, Condition

def needs_more_research(output):
    return "insufficient" in output.content.lower()

workflow = Workflow(
    steps=[
        Step(agent=researcher, name="initial_research"),
        Step(
            agent=analyst,
            name="analysis",
            condition=Condition(
                condition=needs_more_research,
                if_true=Step(agent=researcher, name="deep_research"),
                if_false=None
            )
        )
    ]
)

Parallel Workflow

from agno.workflow import Workflow, Parallel, Step

workflow = Workflow(
    steps=[
        Parallel(steps=[
            Step(agent=web_researcher, name="web_search"),
            Step(agent=paper_researcher, name="paper_search"),
            Step(agent=news_researcher, name="news_search")
        ]),
        Step(agent=synthesizer, name="synthesize")
    ]
)