Custom ReAct — Advanced¶
This tutorial shows how to build a ReAct agent using custom async functions instead of the Agent class. Use this approach when you need full control over the LLM call — for example, to use a provider not supported by the Agent class, or to chain multiple LLM calls within one node.
Already done the basics?
If you haven't read ReAct with Agent Class, start there. The Agent class covers 95% of use cases in far fewer lines of code.
When to Use Custom Functions¶
| Use Custom Functions When... | Use Agent Class When... |
|---|---|
You need a provider not in "openai" or "google" |
Standard OpenAI or Google Gemini |
| Multiple LLM calls in one node | Single LLM call per node |
| Custom message preprocessing | Standard message flow |
| Non-standard response parsing | Standard text/tool output |
How Custom Functions Work¶
A custom node function is an async def that:
- Receives
state: AgentStateandconfig: dict - Manually calls
convert_messages()to build the message list - Calls the LLM SDK directly
- Returns
ModelResponseConverter(response, converter="openai")— the graph engine processes this automatically
from agentflow.utils.converter import convert_messages
from agentflow.adapters.llm.model_response_converter import ModelResponseConverter
async def my_agent(state: AgentState, config: dict) -> ModelResponseConverter:
messages = convert_messages(
system_prompts=[{"role": "system", "content": "You are a helpful assistant."}],
state=state,
)
# Call your LLM here...
response = await client.chat.completions.create(model="gpt-4o", messages=messages)
return ModelResponseConverter(response, converter="openai")
Complete Example: Weather Agent with Custom Functions¶
Step 1: Imports and Setup¶
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from agentflow.graph import StateGraph, ToolNode
from agentflow.state import AgentState, Message
from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.adapters.llm.model_response_converter import ModelResponseConverter
from agentflow.utils.converter import convert_messages
from agentflow.utils.constants import END
load_dotenv()
client = AsyncOpenAI() # Reads OPENAI_API_KEY from environment
Step 2: Define Tools¶
def get_weather(location: str) -> str:
"""
Get the current weather for a location.
Args:
location: City name (e.g., "London", "Tokyo")
Returns:
Weather information as a string
"""
# In production: call a real weather API
return f"The weather in {location} is sunny, 72°F"
def get_forecast(location: str, days: int = 3) -> str:
"""
Get the weather forecast for the next N days.
Args:
location: City name
days: Number of days to forecast (1-7)
Returns:
Forecast summary
"""
return f"{days}-day forecast for {location}: Mostly sunny with highs around 70°F"
tool_node = ToolNode([get_weather, get_forecast])
Step 3: Custom Agent Function¶
SYSTEM_PROMPT = [{"role": "system", "content": """You are a helpful weather assistant.
When users ask about weather, use the available tools to provide accurate information.
If asked about a forecast, use get_forecast. For current conditions, use get_weather."""}]
async def main_agent(state: AgentState, config: dict) -> ModelResponseConverter:
"""Custom reasoning node — calls the LLM directly."""
# Build the message list from state
messages = convert_messages(
system_prompts=SYSTEM_PROMPT,
state=state,
)
# If we just received tool results, respond without offering tools again
if state.context and state.context[-1].role == "tool":
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.7,
)
else:
# First call — offer tools to the LLM
tools = await tool_node.all_tools()
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
temperature=0.7,
)
return ModelResponseConverter(response, converter="openai")
converter= value
Use converter="openai" for OpenAI-compatible responses, converter="google" for Google GenAI responses.
Step 4: Routing Function¶
def route(state: AgentState) -> str:
"""Decide what runs next based on the last message."""
if not state.context:
return END
last = state.context[-1]
# Agent requested tool calls → run tools
if hasattr(last, "tools_calls") and last.tools_calls and last.role == "assistant":
return "TOOL"
# Default → done
return END
Step 5: Build and Compile the Graph¶
graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)
graph.add_conditional_edges("MAIN", route, {"TOOL": "TOOL", END: END})
graph.add_edge("TOOL", "MAIN") # Fixed edge: after tools, back to agent
graph.set_entry_point("MAIN")
app = graph.compile(checkpointer=InMemoryCheckpointer())
Step 6: Run It¶
async def main():
queries = [
"What's the weather in Tokyo right now?",
"Give me a 5-day forecast for Paris.",
]
for i, query in enumerate(queries):
print(f"\n--- Query {i + 1}: {query}")
result = await app.ainvoke(
{"messages": [Message.text_message(query, "user")]},
config={"thread_id": f"session-{i}", "recursion_limit": 10},
)
print(f"Answer: {result['messages'][-1].content}")
if __name__ == "__main__":
asyncio.run(main())
Using Google Gemini with Custom Functions¶
from google import genai
from google.genai import types
import os
google_client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
async def gemini_agent(state: AgentState, config: dict) -> ModelResponseConverter:
"""Custom agent node using Google Gemini directly."""
messages = convert_messages(
system_prompts=SYSTEM_PROMPT,
state=state,
)
# Separate system message from conversation messages
system_instruction = None
contents = []
for msg in messages:
if msg["role"] == "system":
system_instruction = msg["content"]
else:
contents.append(msg["content"])
google_config = types.GenerateContentConfig(
system_instruction=system_instruction,
temperature=0.7,
)
response = await google_client.aio.models.generate_content(
model="gemini-2.5-flash",
contents=contents,
config=google_config,
)
return ModelResponseConverter(response, converter="google")
Routing with Loop Prevention¶
For complex agents that might call many tools, add loop detection:
def safe_route(state: AgentState) -> str:
"""Routing with loop prevention."""
if not state.context:
return END
# Prevent infinite loops: stop after too many tool calls
tool_call_count = sum(1 for msg in state.context if msg.role == "tool")
if tool_call_count >= 5:
return END # Force stop
last = state.context[-1]
if hasattr(last, "tools_calls") and last.tools_calls and last.role == "assistant":
return "TOOL"
return END
Complete Working Code¶
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from agentflow.graph import StateGraph, ToolNode
from agentflow.state import AgentState, Message
from agentflow.checkpointer import InMemoryCheckpointer
from agentflow.adapters.llm.model_response_converter import ModelResponseConverter
from agentflow.utils.converter import convert_messages
from agentflow.utils.constants import END
load_dotenv()
client = AsyncOpenAI()
def get_weather(location: str) -> str:
"""Get the current weather for a location."""
return f"The weather in {location} is sunny, 72°F"
def get_forecast(location: str, days: int = 3) -> str:
"""Get the weather forecast for the next N days."""
return f"{days}-day forecast for {location}: Mostly sunny, highs around 70°F"
tool_node = ToolNode([get_weather, get_forecast])
SYSTEM_PROMPT = [{"role": "system", "content": (
"You are a helpful weather assistant. "
"Use get_weather for current conditions and get_forecast for upcoming days."
)}]
async def main_agent(state: AgentState, config: dict) -> ModelResponseConverter:
messages = convert_messages(system_prompts=SYSTEM_PROMPT, state=state)
if state.context and state.context[-1].role == "tool":
response = await client.chat.completions.create(
model="gpt-4o", messages=messages
)
else:
tools = await tool_node.all_tools()
response = await client.chat.completions.create(
model="gpt-4o", messages=messages, tools=tools
)
return ModelResponseConverter(response, converter="openai")
def route(state: AgentState) -> str:
if not state.context:
return END
last = state.context[-1]
if hasattr(last, "tools_calls") and last.tools_calls and last.role == "assistant":
return "TOOL"
return END
graph = StateGraph()
graph.add_node("MAIN", main_agent)
graph.add_node("TOOL", tool_node)
graph.add_conditional_edges("MAIN", route, {"TOOL": "TOOL", END: END})
graph.add_edge("TOOL", "MAIN")
graph.set_entry_point("MAIN")
app = graph.compile(checkpointer=InMemoryCheckpointer())
async def main():
result = await app.ainvoke(
{"messages": [Message.text_message("What's the weather in Tokyo?", "user")]},
config={"thread_id": "demo", "recursion_limit": 10},
)
print(result["messages"][-1].content)
if __name__ == "__main__":
asyncio.run(main())
Next Steps¶
- Dependency Injection — Inject services and config into node functions with InjectQ
- MCP Integration — Connect to Model Context Protocol servers
- Streaming — Stream tokens in real-time
- Unit Testing — Test agents without real LLM calls