Skip to main content

Graph

When to use this

Use StateGraph and CompiledGraph when you are building or running a workflow. Every agentflow application starts with a graph.

Import paths

from agentflow.core.graph import StateGraph, Agent, ToolNode
from agentflow.utils import START, END

Constants

NameValueDescription
START"__start__"Sentinel node name. Use as the source of the first edge to set the entry point.
END"__end__"Sentinel node name. Add an edge to END from any terminal node.

StateGraph[StateT]

The builder class. Construct a workflow by adding nodes and edges, then call compile() to get an executable CompiledGraph.

Constructor

from agentflow.core.graph import StateGraph
from agentflow.core.state import AgentState

graph = StateGraph() # default AgentState
graph = StateGraph(MyCustomState()) # custom state instance
graph = StateGraph(MyCustomState) # custom state class (instantiated automatically)

Parameters:

ParameterTypeDefaultDescription
stateStateT | type[StateT] | NoneNoneInitial state instance or class. Defaults to AgentState().
context_managerBaseContextManager | NoneNoneCross-node state operation handler.
publisherBasePublisher | NoneNoneEvent publisher for monitoring.
id_generatorBaseIDGeneratorDefaultIDGenerator()ID generator for messages/threads.
containerInjectQ | NoneNoneDependency injection container. Uses global singleton if None.

Methods

add_node

graph.add_node(func)                          # name inferred from function.__name__
graph.add_node("my_node", func) # explicit name
graph.add_node("agent", agent_instance) # Agent or ToolNode instance

Returns StateGraph for chaining.

Calling patternWhen to use
add_node(func)Simple function nodes where the function name is a good node name.
add_node("name", func)When you need a custom node name.
add_node("name", agent)When attaching an Agent or ToolNode instance.

add_edge

graph.add_edge(START, "entry")   # also sets the entry point
graph.add_edge("entry", END)
graph.add_edge("node_a", "node_b")

Static, unconditional edge. The graph always follows this route.


add_conditional_edges

def route(state: AgentState, config: dict) -> str:
if state.data.get("is_done"):
return END
return "process"

graph.add_conditional_edges("check", route)

# With a path map
graph.add_conditional_edges(
"classify",
lambda state, config: state.data.get("category", "default"),
path_map={
"urgent": "urgent_handler",
"normal": "normal_handler",
"default": "fallback",
},
)

Parameters:

ParameterTypeDescription
from_nodestrSource node name.
conditionCallableFunction receiving (state, config) and returning a routing key.
path_mapdict[str, str] | NoneMaps condition return values to node names. If None, the condition must return a node name directly.

set_entry_point

graph.set_entry_point("my_node")
# equivalent to graph.add_edge(START, "my_node")

override_node

graph.override_node("MAIN", test_agent)

Replaces an existing node with a new function, Agent, or ToolNode. The node must already exist. Useful for swapping production nodes with test doubles before compilation.


compile

app: CompiledGraph = graph.compile()

app = graph.compile(
checkpointer=my_checkpointer,
store=my_store,
media_store=my_media_store,
interrupt_before=["review_node"],
interrupt_after=["tool_node"],
shutdown_timeout=30.0,
)

Parameters:

ParameterTypeDefaultDescription
checkpointerBaseCheckpointer | NoneNoneState persistence backend. Defaults to InMemoryCheckpointer.
storeBaseStore | NoneNoneLong-term memory store.
media_storeBaseMediaStore | NoneNoneMedia/file storage backend.
interrupt_beforelist[str] | NoneNoneNode names to pause execution before.
interrupt_afterlist[str] | NoneNoneNode names to pause execution after.
callback_managerCallbackManagerCallbackManager()Hooks for evaluation collectors and monitoring.
shutdown_timeoutfloat30.0Seconds to wait for background tasks during graceful shutdown.

Raises: GraphError if no entry point is set or interrupt nodes are invalid.


CompiledGraph[StateT]

The executable graph produced by StateGraph.compile(). Do not instantiate directly.

invoke

result = app.invoke(
{"messages": [Message.text_message("Hello")]},
config={"thread_id": "session-1", "user_id": "alice"},
response_granularity=ResponseGranularity.LOW,
)

Synchronous execution. Blocks until the graph finishes.

Uses asyncio.run() internally — do not call from an async context. Use ainvoke() instead.


ainvoke

result = await app.ainvoke(
{"messages": [Message.text_message("Hello")]},
config={"thread_id": "session-1"},
response_granularity=ResponseGranularity.FULL,
)

Asynchronous execution. Auto-detects whether to start fresh or resume from an interrupted state.

Parameters (both invoke and ainvoke):

ParameterTypeDefaultDescription
input_datadict[str, Any]requiredInput dict. Must contain "messages" for new runs.
configdict[str, Any] | NoneNoneExecution config. Keys: thread_id, user_id, run_id, recursion_limit.
response_granularityResponseGranularityLOWControls how much is included in the response. See table below.

ResponseGranularity values:

ValueImportWhat is returned
LOWfrom agentflow.utils import ResponseGranularitymessages only
PARTIALmessages, context, summary
FULLmessages, context, summary, full state object

Returns: dict with keys depending on granularity.


stream / astream

# Sync
for chunk in app.stream({"messages": [msg]}, config={"thread_id": "t1"}):
if chunk.event == "message":
print(chunk.message.content)

# Async
async for chunk in app.astream({"messages": [msg]}, config={"thread_id": "t1"}):
if chunk.event == "message":
print(chunk.message.content)

Yields StreamChunk objects as the graph executes.

StreamChunk fields:

FieldTypeDescription
eventStreamEventOne of MESSAGE, STATE, UPDATES, ERROR.
messageMessage | NonePopulated for MESSAGE events.
stateAgentState | NonePopulated for STATE events.
datadict | NonePopulated for UPDATES and ERROR events.
thread_idstr | NoneThread ID for this execution.
run_idstr | NoneRun ID for this execution.
timestampfloatUNIX timestamp of chunk creation.

stop / astop

# Async — call from within an async route handler
resp = await app.astop(config={"thread_id": "session-1"})
# {"ok": True, "running": True}

# Sync
resp = app.stop(config={"thread_id": "session-1"})

Sets a stop flag on the running execution. The graph checks this flag between node transitions and halts cleanly.

Returns: dict with ok, running, and optional reason keys.


override_node (on CompiledGraph)

compiled.override_node("MAIN", test_agent)

Same semantics as StateGraph.override_node() but on an already-compiled graph. Useful when testing a compiled production graph without rebuilding it.


aclose

await app.aclose()

Cleanup method for graceful shutdown. Stops background tasks, closes publisher connections, and releases any held resources. Always call this when shutting down a long-lived application.


Config dictionary keys

KeyTypeAuto-generated if missingDescription
thread_idstr | intYes (UUID or from ID generator)Identifies the conversation thread for checkpointing.
user_idstrYes ("test-user-id")User identifier. Override in production.
run_idstr | intYesUnique run identifier.
recursion_limitintNo (framework default: 25)Maximum node transitions before GraphRecursionError.
timestampstrYesISO 8601 timestamp of the run.

Node function signature

Node functions receive the current state and config as their first two positional arguments. Additional keyword arguments are resolved from the dependency injection container:

from agentflow.core.state import AgentState
from agentflow.storage.store import BaseStore

def my_node(state: AgentState, config: dict, store: BaseStore) -> list:
# Return a list of Message objects to append to state.context
return [Message.text_message("response", role="assistant")]

Alternatively, functions can return an updated state dict:

def my_node(state: AgentState, config: dict) -> dict:
return {"context_summary": "Updated summary"}

Common errors

ErrorCauseFix
GraphError GRAPH_002compile() called with no entry point.Call set_entry_point() or add_edge(START, "node").
GraphError GRAPH_004interrupt_before or interrupt_after contains a node name that does not exist.Check node names against graph.nodes.keys().
GraphRecursionErrorExecution exceeded recursion_limit.Increase recursion_limit in config or fix a cycle in the graph.
RuntimeError in async contextinvoke() called inside an async function.Use ainvoke() instead.