Async Pattern Standardization¶
This guide explains when and how to use synchronous and asynchronous patterns in Agentflow, following Python asyncio best practices.
Table of Contents¶
Overview¶
Agentflow is built on asyncio for efficient handling of I/O-bound operations like:
- LLM API calls
- Database queries
- File I/O
- Network requests
- Message queue operations
However, we provide both sync and async APIs for flexibility. Understanding when to use each is crucial for optimal performance.
When to Use Async vs Sync¶
Use Async When:¶
-
Your application is async: If your main application uses
asyncio, use async APIs -
Running in an async framework: FastAPI, aiohttp, Quart, etc.
-
Handling multiple concurrent operations:
-
Streaming responses: Real-time processing with streaming
Use Sync When:¶
-
Simple scripts or notebooks: Jupyter notebooks, one-off scripts
-
Interactive exploration: REPL, debugging
-
Integration with sync frameworks: Flask, Django (without async views)
-
Testing simple scenarios: Quick unit tests
Best Practices¶
1. Don't Mix Event Loops¶
❌ BAD:
async def main():
# This creates a nested event loop - will fail!
result = graph.invoke(input_data) # Uses asyncio.run() internally
✅ GOOD:
2. Use Context Managers for Resource Cleanup¶
✅ Async context manager (preferred for async apps):
async def main():
graph = build_graph().compile()
try:
result = await graph.ainvoke(input_data)
finally:
await graph.aclose() # Ensure cleanup
3. Avoid Blocking Operations in Async Code¶
❌ BAD:
async def process_node(state: AgentState) -> AgentState:
# Blocks the event loop!
time.sleep(5)
response = requests.get("https://api.example.com") # Blocking I/O
return state
✅ GOOD:
async def process_node(state: AgentState) -> AgentState:
# Non-blocking
await asyncio.sleep(5)
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
data = await response.json()
return state
4. Use asyncio.gather for Concurrent Operations¶
async def parallel_processing(inputs: list[dict]):
"""Process multiple inputs concurrently."""
tasks = [graph.ainvoke(inp) for inp in inputs]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
5. Handle Exceptions Properly¶
async def safe_invoke(input_data: dict):
try:
result = await graph.ainvoke(input_data)
return result
except Exception as e:
logger.exception("Error during graph execution: %s", e)
raise
Common Patterns¶
Pattern 1: Async with Streaming¶
async def process_with_streaming(query: str):
"""Process query with real-time streaming output."""
async for chunk in graph.astream({"messages": [Message.from_text(query)]}):
if chunk.content_type == "message":
# Stream content to client
yield chunk.content
Pattern 2: Rate-Limited Concurrent Processing¶
async def batch_process_with_limit(items: list[dict], limit: int = 5):
"""Process items concurrently with rate limiting."""
semaphore = asyncio.Semaphore(limit)
async def process_with_limit(item):
async with semaphore:
return await graph.ainvoke(item)
tasks = [process_with_limit(item) for item in items]
results = await asyncio.gather(*tasks)
return results
Pattern 3: Timeout Handling¶
async def invoke_with_timeout(input_data: dict, timeout: float = 30.0):
"""Invoke graph with timeout protection."""
try:
result = await asyncio.wait_for(
graph.ainvoke(input_data),
timeout=timeout
)
return result
except TimeoutError:
logger.error("Graph execution timed out after %ss", timeout)
raise
Pattern 4: Retry Logic¶
async def invoke_with_retry(
input_data: dict,
max_retries: int = 3,
backoff: float = 1.0
):
"""Invoke graph with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await graph.ainvoke(input_data)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(backoff * (2 ** attempt))
logger.warning("Retry %d/%d after error: %s", attempt + 1, max_retries, e)
Pattern 5: Graceful Shutdown with Signal Handling¶
import signal
from agentflow.utils import GracefulShutdownManager
async def main():
shutdown_manager = GracefulShutdownManager(shutdown_timeout=30.0)
graph = build_graph().compile(shutdown_timeout=30.0)
# Register signal handlers
shutdown_manager.register_signal_handlers()
try:
# Protected initialization
with shutdown_manager.protect_section():
await initialize_resources()
# Normal execution
while not shutdown_manager.shutdown_requested:
await graph.ainvoke(get_next_input())
except KeyboardInterrupt:
logger.info("Shutdown requested via SIGINT")
finally:
# Protected cleanup
with shutdown_manager.protect_section():
await graph.aclose()
shutdown_manager.unregister_signal_handlers()
if __name__ == "__main__":
asyncio.run(main())
Migration Guide¶
Converting Sync to Async¶
If you're migrating from sync to async APIs:
-
Change function signatures:
-
Use async APIs:
-
Replace blocking calls:
-
Update main entry point:
Examples¶
Full Async Application¶
import asyncio
from agentflow import StateGraph, AgentState, Message
from agentflow.utils import GracefulShutdownManager
async def agent_node(state: AgentState) -> AgentState:
"""Process with async LLM call."""
# Your async processing here
return state
async def main():
# Build graph
graph = StateGraph()
graph.add_node("agent", agent_node)
graph.set_entry_point("agent")
graph.add_edge("agent", "END")
# Compile with shutdown configuration
compiled = graph.compile(shutdown_timeout=30.0)
# Setup graceful shutdown
shutdown_manager = GracefulShutdownManager(shutdown_timeout=30.0)
shutdown_manager.register_signal_handlers()
try:
# Process inputs
result = await compiled.ainvoke({
"messages": [Message.from_text("Hello")]
})
print(result)
finally:
# Graceful cleanup
stats = await compiled.aclose()
print(f"Shutdown stats: {stats}")
shutdown_manager.unregister_signal_handlers()
if __name__ == "__main__":
asyncio.run(main())
Sync Application (Simple Scripts)¶
from agentflow import StateGraph, AgentState, Message
def agent_node(state: AgentState) -> AgentState:
"""Simple sync node."""
return state
def main():
# Build and compile
graph = StateGraph()
graph.add_node("agent", agent_node)
graph.set_entry_point("agent")
graph.add_edge("agent", "END")
compiled = graph.compile()
# Execute
result = compiled.invoke({
"messages": [Message.from_text("Hello")]
})
print(result)
if __name__ == "__main__":
main()
Performance Considerations¶
- Async shines with I/O-bound workloads: Network calls, database queries, file I/O
- CPU-bound work doesn't benefit from async: Use multiprocessing for CPU-intensive tasks
- Context switching overhead: For very simple, fast operations, sync might be faster
- Memory usage: Async applications generally use less memory for concurrent operations than threads
Debugging Tips¶
-
Enable asyncio debug mode:
-
Use logging to track async flow:
-
Watch for unawaited coroutines: Enable warnings