Control Flow & Edges¶
Control flow in PyAgenity is explicit: you wire deterministic edges when constructing the graph or emit a Command
at
runtime to jump. This page explains edges, conditional routing, recursion limits, interrupts, and stop requests.
Edge Types¶
Mechanism | When Defined | Purpose |
---|---|---|
add_edge(from, to) |
Build time | Linear / deterministic progression |
add_conditional_edges(node, condition, map) |
Build time | Declarative branching based on state-derived label |
Command(goto=...) |
Runtime | Imperative jump chosen inside node logic |
Even when using Command
, having a fallback static edge can provide safety if the command returns None
.
Basic Edges¶
graph.add_node("A", step_a)
graph.add_node("B", step_b)
graph.add_edge("A", "B")
graph.add_edge("B", END)
The runtime tracks current node name in execution_meta.current_node
inside AgentState
.
Conditional Edges¶
from pyagenity.utils import END
def classify(state: AgentState) -> str:
last = state.context[-1].text() if state.context else ""
if "tool" in last:
return "TOOLS"
if "bye" in last:
return END
return "RESPOND"
graph.add_node("CLASSIFY", classify)
graph.add_node("RESPOND", respond)
graph.add_node("TOOLS", tool_node)
graph.add_conditional_edges(
"CLASSIFY",
classify,
{
"RESPOND": "RESPOND",
"TOOLS": "TOOLS",
END: END,
},
)
Rules:
- Function must return a string label
- Every possible label must exist in the mapping (including
END
if used) - Missing labels raise at runtime when encountered
Prefer conditional edges over Command
for predictable branching: they’re easier to test and visualize.
Runtime Jumps with Command¶
If decision logic depends on external services or side effects performed inside the node, a Command
can encode the next
step:
def after_tool(state, config):
if expensive_validation(state):
return Command(goto="REPAIR")
return Command(goto="SUMMARIZE")
Combine with conditional edges for hybrid strategies (e.g. coarse routing via conditional, fine branching via command).
Recursion / Step Limit¶
Each invoke has a recursion (step) limit (default 25 unless overridden in config["recursion_limit"]
). After each
node execution the counter increments; exceeding the limit raises a GraphRecursionError
.
Best practices:
- Ensure tool loops have terminal conditions
- Use explicit
END
returns in classification nodes when conversation is done - Log step counts for long-running sessions
Interrupts & Stop Requests¶
PyAgenity supports robust human-in-the-loop (HITL) patterns through interrupt and stop mechanisms:
Mechanism | Trigger | Effect | Use Case |
---|---|---|---|
interrupt_before / interrupt_after lists (compile) |
Node name match | Execution halts and state persisted before/after node | Approval workflows, debug points |
stop() / astop() |
External API call with thread_id |
Sets stop flag; checked before executing next node | Dynamic cancellation from UI/frontend |
Basic Interrupt Example¶
from pyagenity.checkpointer import InMemoryCheckpointer
# Compile with interrupt points
app = graph.compile(
checkpointer=InMemoryCheckpointer(), # Required for resuming
interrupt_before=["EXECUTE_TOOL"], # Pause before tool execution for approval
interrupt_after=["ANALYZE"] # Pause after analysis for inspection
)
# Initial execution (will pause at interrupt point)
result = app.invoke(input_data, config={"thread_id": "session-123"})
if result.get("interrupted"):
print(f"Paused: {result['interrupt_reason']}")
# Human review/approval logic here...
# Resume with same thread_id
final_result = app.invoke(
{"messages": [Message.text_message("Approved")]},
config={"thread_id": "session-123"}
)
Dynamic Stop Control¶
import threading
import time
# Start agent in background
def run_agent():
for chunk in app.stream(input_data, config={"thread_id": "my-session"}):
print(chunk.content)
agent_thread = threading.Thread(target=run_agent)
agent_thread.start()
# Stop from external code (e.g., frontend button click)
time.sleep(2.0)
status = app.stop({"thread_id": "my-session"})
print(f"Stop requested: {status}")
Key Requirements: - Checkpointer: Required for interrupt resume functionality - Thread ID: Must be consistent between initial execution and resume - State Persistence: Interrupted state is automatically saved and restored
For comprehensive HITL patterns, approval workflows, debug strategies, and advanced interrupt handling, see Human-in-the-Loop & Interrupts.
Handling Tool Routing¶
A typical pattern:
- Reasoning node produces assistant message with
tool_calls
- Conditional edge or
Command
routes toTOOL
node ToolNode
executes each tool (injection:tool_call_id
,state
, other DI)- Tool messages appended (role =
tool
) - Edge returns to reasoning node for final answer
Ensure your conditional edge logic treats a trailing tool
role message as a signal to proceed to final response (see examples/react/react_sync.py
).
Error Paths¶
Unhandled exceptions in a node:
- Mark execution state as error
- Emit error event via publisher (if configured)
- Propagate unless caught; you can wrap risky sections and return a recovery
Command
Use callback hooks (DI: CallbackManager
) to add custom retry/backoff policies.
Debug Checklist¶
Symptom | Likely Cause | Fix |
---|---|---|
Stalls mid-run | Missing edge or wrong label | Verify add_conditional_edges mapping keys exactly match returns |
Infinite loop | No terminal condition and step limit high | Add termination branch or reduce recursion limit |
Unexpected END | Condition returned END prematurely |
Inspect classifier logic with logging |
Duplicate tool execution | Node re-emits same tool calls | Prune executed tool calls or gate by state flag |
Next: Tools & Integrations (tools.md
).