Use when the user needs workflow orchestration such as branching, concurrency, approvals, waiting and resume, runtime stream, restart-safe execution, mixed sync/async function or module orchestration, event-driven fan-out, process-clarity refactors that make stages explicit, performance-oriented refactors that collapse split requests, or workflow definitions and chunk-level runtime metadata that must stay visible for debugging and visualization. The user does not need to say TriggerFlow explicitly.
54
60%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
Optimize this skill with Tessl
npx tessl skill review --optimize ./skills/agently-triggerflow/SKILL.mdUse this skill when the solution clearly needs orchestration semantics rather than one request family.
The user does not need to say TriggerFlow or Agently. Scenario language such as resumable approval flow, branching automation, output-fan-out refactor, mixed sync/async pipeline, process-clarity refactor, or draft-review-revise pipeline should still route here once orchestration is clearly the owner layer.
agent.create_task(...) when the app needs one Agent-owned business
task loop where the model owns planning, verification, and replan; it returns
a task-strategy AgentExecution draft, not a separate public AgentTask
handle. Use agent.create_task_loop(...) only when the code should make the
task-loop strategy explicit; it still returns an AgentExecution draft. Use
TriggerFlow directly when the application owns explicit stages, branching,
pause/resume, or restart topologyresult = execution.get_result(), result.get_text(),
result.get_data(), result.get_meta(),
execution.get_async_generator(), execution.async_get_meta()) and surface task refs or
meta.stream_kind=="snapshot" items as compact intermediate state captures;
enable options={"agent_task": {"stream_progress": True}} only when the
example or host needs natural-language operator updates; omit
progress_model_key for template progress with no model requests, or set
progress_model_key to run a separate background model that summarizes only
existing snapshots/task metadata without adding main-loop fields or latency;
prove replan behavior from stream verification/replan events and snapshots
rather than hiding the proof in a local Python loop; mocked business systems
may supply defective facts or conflicting source data, but must not return
pass/fail verdicts or deterministic quality judgmentsclose() / async_close() for completion and cleanupcreate_execution(concurrency=N) and execution.set_concurrency(N) as an execution-wide handler dispatch budget, including nested dispatch from chunk continuations and data.async_emit(...); use operator-local batch(..., concurrency=...) or for_each(..., concurrency=...) only for local fan-out capsexecution.result when services, UIs, stream consumers, or intervention-aware workflows need multiple views of one execution outcome, such as state, compatibility final result, interventions, and metadata; use execution.close() / execution.async_close() for close snapshots.intervention_point(...) boundaries so execution creation can infer planned mode, or create the execution with intervention_mode="auto" for boundary policy insertion; chunks read inserted context with data.get_interventions(...) and explicitly audit usage with data.async_mark_intervention_consumed(...), relying on the chunk-name consumer default unless another consumer identity is clearerpause_for(..., resume_to="next" | "self" | {"event": ...}); treat it as a durable graph interrupt, not Python coroutine stack persistence; teach model-decided autonomous interrupts with model-owned pause_for(..., resume_to="self"), where the resumed chunk handles data.is_resume and the default max_resumes=1 prevents unbounded self-replay; teach prearranged approval gates with an explicit pause chunk plus when(...)pause_for(type="policy_approval", ...)
interrupts that resume through continue_with(...)pause_for(...) behind hidden execution sugar such as flow.start() or flow-level runtime stream helpers; create an explicit execution handle and consume get_pending_interrupts() / continue_with(...)close() / async_close() rejects pending interrupts by default, and pending_interrupts="cancel" must be chosen deliberately when abandoning waitsemit_nowait(...) / async_emit_nowait(...) when a chunk must fan out without blocking the current handler, and rely on execution close to drain registered tasksawait execution.async_close() to drain registered tasks and collect the
close snapshot. Do not add while True status polling loops such as
execution.result.get_state("status") unless a chunk explicitly owns and
updates that status contractdata.emit(...), data.async_emit(...), data.emit_nowait(...), and data.async_emit_nowait(...) to inherit the current TriggerFlow runtime scope; do not assume unrelated external emits can be paired by when(..., mode="and") unless the host routes them through one scoped flow stage or carries explicit correlation in the payloadget_state(...) / set_state(...) instead of legacy runtime-data helpers in new examplesexecution.save() is a versioned top-level
execution snapshot with durable TriggerFlow progress, flow definition
fingerprint validation, interrupt/resume ledgers, resource requirements,
lease metadata, and managed execution environment requirements. TriggerFlow
core is process-stateless between save/load; live runtime_resources,
clients, callbacks, tasks, semaphores, stateful sessions, and Python
coroutine frames are not serialized. runtime_resources is only the
process-local mount point for live objects that the host has already created,
restored, and validated. If a resource carries state, the external system
that owns it must persist a state ref/version/lease or fence token and a
resolver/provider must validate it before load is ready. Declare future
resources with flow.declare_resource_requirement(...) or
execution.declare_resource_requirement(...), inspect or restore with
execution.inspect_load(...) / execution.async_load(...), pass a stable
resume_request_id to continue_with(...) for external callbacks, and
persist through a snapshot store that implements put_snapshot(...) while
the production store owns atomic claim, lease enforcement, conflict handling,
outbox ordering, and external side-effect idempotency. Fail-closed pending
resolvers or pending managed execution environments should be treated as
status="pending_resources" and ready=False until async_load(...)
resolves and validates live resources. Late callbacks delivered to an
expired execution-local lease fail fast before resume acceptance without
writing resume ledger entries; the reclaimed worker should load or claim
first and then use the same stable resume_request_id. For long-running
executions, use execution.set_compaction_policy(...) when snapshot writes
should automatically run a host-owned reducer and store large payloads behind
provider artifact refs; TriggerFlow records compaction facts, retained
anchors, and load read bounds, while Workspace or an enterprise provider owns
artifact storage and retention anchor persistence. For framework-level
validation, run examples/trigger_flow/durable_recovery.py when available; it
proves Workspace-backed snapshot load and duplicate callback idempotency
without pretending a production approval transport exists. Use
examples/trigger_flow/fastapi_sqlite_exchange_provider.py when service-level
provider replacement should be shown: it packages the flow as a module-level
TriggerFlow(...) object with top-level .to(...) / .when(...) wiring,
stores top-level execution snapshots in SQLite, and routes approval waits
through a SQLite ExecutionExchangeProvider exposed by FastAPI without
claiming production distributed guarantees. If a snapshot
fingerprint is missing or does not match the current flow definition,
inspect_load(...) reports invalid_snapshot and load(...) rejects the
snapshotTriggerFlow(...) object, module-level chunk
handlers, and module-level .to(...) / .when(...) wiring. Service modules
should import that flow object and start executions from it. Normal Python
imports execute the module body once per process for the same module name;
TriggerFlow duplicate-definition protection is only the second line of
defense when application code explicitly runs the same wiring again on the
same flow object. Create live dependencies in host-owned factories or
importable resolvers, and use runtime_resources only as the final
process-local attachment point for already-created live objects. Use a
build_flow(...) helper only when the application genuinely needs multiple
configured flow instances or test isolation, not as the default service shapeagently-dynamic-task; TaskDAG is the DAG foundation capability, Dynamic Task is the convenience facade over it, and TriggerFlow is the execution substrate rather than the facade APIwhen(...) + emit_nowait(...) as the native signal-driven pattern for fan-out, loops, side branches, and dependency joins; definition idempotence must not be confused with runtime signal deduplicationflow.when(["task_a_done", "task_b_done"], mode="and").to(...) and have
upstream chunks call data.emit_nowait("task_a_done", payload). Do not
replace joins with sleeps, polling loops, local completed sets, or
pause_for(..., resume_to="self") unless the workflow is genuinely waiting
for external input across process timeflow.to(handler, name=...), dependent tasks attach with
flow.when(dep_signals, mode="and").to(handler), and task handlers emit their
own completion signals. Do not implement a generic local scheduler that scans
task_status, completed, or dependency lists to decide which task is ready;
that hides the DAG from TriggerFlow and defeats graph visibility. Do not pass
lambda data: async_task_handler(data, task) to flow.to(...); that registers
a sync lambda whose return coroutine may not be awaited. Use a normal factory
that returns async def handler(data): ... and register that handlerauto_allow=True; explicit host configure_skill_capabilities(...) policy
still wins and Skill metadata alone is not a capability grantagently-request when one workflow step needs model setup, prompt contracts, structured output, response reuse, session behavior, or retrievalWhen generating or editing Python code, use the actual Agently API shape:
from agently import Agent, Agently, TriggerFlow, TriggerFlowRuntimeData, Workspace
agent = Agent()
factory_agent = Agently.create_agent()
flow = TriggerFlow(name="workflow-name")
factory_flow = Agently.create_trigger_flow("factory-workflow")
shared_workspace = Workspace("./.agently/workflows/shared")Agent() / Agently.create_agent() and TriggerFlow() /
Agently.create_trigger_flow(...) are both valid first-class creation styles.
Workspace(...) / Agently.create_workspace(...) are both valid first-class
Workspace creation styles. Use one style consistently in an example unless
showing the API equivalence.
when, emit_nowait, and pause_for are not top-level imports from
agently. They are methods on flow or runtime data objects:
flow.when(...).to(handler, name="...")data.emit_nowait(...)await data.async_pause_for(...)execution = flow.create_execution(auto_close=False),
await execution.async_start(input_value) using a positional start value, and
snapshot = await execution.async_close()flow.create_execution() binds an execution-scoped lazy Workspace by default;
pass workspace=False to opt out, or
flow.create_execution(workspace=shared_workspace) when an application-owned
Workspace should be shared with Agents, service workers, or other executionsexecution.async_start(input_value=...); pass the start value
positionallyexecution.async_start("start") emits a custom "start" event.
It starts the execution with "start" as the start input. Use flow.to(...)
for start-bound chunks, or explicitly call await execution.async_emit("start", payload)
after start when a custom event is intentionalasync_close() / close() returns the close snapshot as a dict. Read it as
snapshot["key"] or inspect execution state through
execution.result.get_state("key"); do not write snapshot.state,
snapshot.pending_interrupts, or execution.result.stateTriggerFlow(name="...") or unnamed flows with
TriggerFlow(). Do not write TriggerFlow("name"); the first positional
argument is not the workflow name in the current APIDo not write from agently import when, emit_nowait, pause_for, and do not use
@flow.when(...) as a decorator.
Do not treat Dynamic Task as TriggerFlow syntax. If a model or application
submits a DAG as data, route to agently-dynamic-task and use
Agently.create_dynamic_task(...), TaskDAGValidator, and TaskDAGExecutor
there. TriggerFlow remains the substrate for stable workflow definitions that
the developer owns in code.
asyncio.create_task(data.async_emit(...)) as the default nowait pattern when execution-managed emit_nowait(...) is available.end(), get_result(), or set_result() as the default lifecycle path for new TriggerFlow codeexecution.result can expose the needed state/final-result/intervention/meta viewpause_for(...) / continue_with(...) when the workflow must stop for external inputintervene(...) as emit(...), graph mutation, input rewriting, chunk cancellation, or replay of completed chunksget_runtime_data(...) / set_runtime_data(...) in new guidance when get_state(...) / set_state(...) communicates the same intentruntime.show_deprecation_warnings=False as a migration substitute; it is only a production noise-control settingawait data.async_pause_for(...) surviving a process restart; put post-resume logic in the downstream chunk, data.is_resume branch, or explicit resume event handlerruntime_resources would keep the handler reusable, testable, and export-friendlyreferences/overview.mdreferences/runtime-intervention.mdreferences/stream-bridge.mdreferences/devtools-graph.mdb88efd8
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.