PyAgenity Graph Module - Core Workflow Engine.
This module provides the foundational components for building and executing agent workflows in PyAgenity. It implements a graph-based execution model similar to LangGraph, where workflows are defined as directed graphs of interconnected nodes that process state and execute business logic.
Architecture Overview:¶
The graph module follows a builder pattern for workflow construction and provides a compiled execution environment for runtime performance. The core components work together to enable complex, stateful agent interactions:
- StateGraph: The primary builder class for constructing workflows
- Node: Executable units that encapsulate functions or tool operations
- Edge: Connections between nodes that define execution flow
- CompiledGraph: The executable runtime form of a constructed graph
- ToolNode: Specialized node for managing and executing tools
Core Components:¶
StateGraph
The main entry point for building workflows. Provides a fluent API for adding nodes, connecting them with edges, and configuring execution behavior. Supports both static and conditional routing between nodes.
Node
Represents an executable unit within the graph. Wraps functions or ToolNode instances and handles dependency injection, parameter mapping, and execution context. Supports both regular and streaming execution modes.
Edge
Defines connections between nodes, supporting both static (always followed) and conditional (state-dependent) routing. Enables complex branching logic and decision trees within workflows.
CompiledGraph
The executable runtime form created by compiling a StateGraph. Provides synchronous and asynchronous execution methods, state persistence, event publishing, and comprehensive error handling.
ToolNode
A specialized registry and executor for callable functions from various sources including local functions, MCP tools, Composio integrations, and LangChain tools. Supports automatic schema generation and unified tool execution.
Key Features:¶
- State Management: Persistent, typed state that flows between nodes
- Dependency Injection: Automatic injection of framework services
- Event Publishing: Comprehensive execution monitoring and debugging
- Streaming Support: Real-time incremental result processing
- Interrupts & Resume: Pauseable execution with checkpointing
- Tool Integration: Unified interface for various tool providers
- Type Safety: Generic typing for custom state classes
- Error Handling: Robust error recovery and callback mechanisms
Usage Example:¶
```python
from pyagenity.graph import StateGraph, ToolNode
from pyagenity.utils import START, END
# Define workflow functions
def process_input(state, config):
# Process user input
result = analyze_input(state.context[-1].content)
return [Message.text_message(f"Analysis: {result}")]
def generate_response(state, config):
# Generate final response
response = create_response(state.context)
return [Message.text_message(response)]
# Create tools
def search_tool(query: str) -> str:
return f"Search results for: {query}"
tools = ToolNode([search_tool])
# Build the graph
graph = StateGraph()
graph.add_node("process", process_input)
graph.add_node("search", tools)
graph.add_node("respond", generate_response)
# Define flow
graph.add_edge(START, "process")
graph.add_edge("process", "search")
graph.add_edge("search", "respond")
graph.add_edge("respond", END)
# Compile and execute
compiled = graph.compile()
result = compiled.invoke({"messages": [Message.text_message("Hello, world!")]})
# Cleanup
await compiled.aclose()
```
Integration Points:¶
The graph module integrates with other PyAgenity components:
- State Module: Provides AgentState and context management
- Utils Module: Supplies constants, messages, and helper functions
- Checkpointer Module: Enables state persistence and recovery
- Publisher Module: Handles event publishing and monitoring
- Adapters Module: Connects with external tools and services
This architecture provides a flexible, extensible foundation for building sophisticated agent workflows while maintaining simplicity for common use cases.
Modules:
Name | Description |
---|---|
compiled_graph |
|
edge |
Graph edge representation and routing logic for PyAgenity workflows. |
node |
Node execution and management for PyAgenity graph workflows. |
state_graph |
|
tool_node |
ToolNode package. |
utils |
|
Classes:
Name | Description |
---|---|
CompiledGraph |
A fully compiled and executable graph ready for workflow execution. |
Edge |
Represents a connection between two nodes in a graph workflow. |
Node |
Represents a node in the graph workflow. |
StateGraph |
Main graph class for orchestrating multi-agent workflows. |
ToolNode |
A unified registry and executor for callable functions from various tool providers. |
Attributes¶
Classes¶
CompiledGraph
¶
A fully compiled and executable graph ready for workflow execution.
CompiledGraph represents the final executable form of a StateGraph after compilation. It encapsulates all the execution logic, handlers, and services needed to run agent workflows. The graph supports both synchronous and asynchronous execution with comprehensive state management, checkpointing, event publishing, and streaming capabilities.
This class is generic over state types to support custom AgentState subclasses, ensuring type safety throughout the execution process.
Key Features: - Synchronous and asynchronous execution methods - Real-time streaming with incremental results - State persistence and checkpointing - Interrupt and resume capabilities - Event publishing for monitoring and debugging - Background task management - Graceful error handling and recovery
Attributes:
Name | Type | Description |
---|---|---|
_state |
The initial/template state for graph executions. |
|
_invoke_handler |
InvokeHandler[StateT]
|
Handler for non-streaming graph execution. |
_stream_handler |
StreamHandler[StateT]
|
Handler for streaming graph execution. |
_checkpointer |
BaseCheckpointer[StateT] | None
|
Optional state persistence backend. |
_publisher |
BasePublisher | None
|
Optional event publishing backend. |
_store |
BaseStore | None
|
Optional data storage backend. |
_state_graph |
StateGraph[StateT]
|
Reference to the source StateGraph. |
_interrupt_before |
list[str]
|
Nodes where execution should pause before execution. |
_interrupt_after |
list[str]
|
Nodes where execution should pause after execution. |
_task_manager |
Manager for background async tasks. |
Example
# After building and compiling a StateGraph
compiled = graph.compile()
# Synchronous execution
result = compiled.invoke({"messages": [Message.text_message("Hello")]})
# Asynchronous execution with streaming
async for chunk in compiled.astream({"messages": [message]}):
print(f"Streamed: {chunk.content}")
# Graceful cleanup
await compiled.aclose()
Note
CompiledGraph instances should be properly closed using aclose() to release resources like database connections, background tasks, and event publishers.
Methods:
Name | Description |
---|---|
__init__ |
|
aclose |
Close the graph and release any resources. |
ainvoke |
Execute the graph asynchronously. |
astop |
Request the current graph execution to stop (async). |
astream |
Execute the graph asynchronously with streaming support. |
generate_graph |
Generate the graph representation. |
invoke |
Execute the graph synchronously and return the final results. |
stop |
Request the current graph execution to stop (sync helper). |
stream |
Execute the graph synchronously with streaming support. |
Source code in pyagenity/graph/compiled_graph.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
|
Functions¶
__init__
¶
__init__(state, checkpointer, publisher, store, state_graph, interrupt_before, interrupt_after, task_manager)
Source code in pyagenity/graph/compiled_graph.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
|
aclose
async
¶
aclose()
Close the graph and release any resources.
Source code in pyagenity/graph/compiled_graph.py
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
|
ainvoke
async
¶
ainvoke(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously.
Auto-detects whether to start fresh execution or resume from interrupted state based on the AgentState's execution metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Input dict with 'messages' key (for new execution) or additional data for resuming |
required |
|
dict[str, Any] | None
|
Configuration dictionary |
None
|
|
ResponseGranularity
|
Response parsing granularity |
LOW
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Response dict based on granularity |
Source code in pyagenity/graph/compiled_graph.py
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
|
astop
async
¶
astop(config)
Request the current graph execution to stop (async).
Contract: - Requires a valid thread_id in config - If no active thread or no checkpointer, returns not-running - If state exists and is running, set stop_requested flag in thread info
Source code in pyagenity/graph/compiled_graph.py
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
|
astream
async
¶
astream(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously with streaming support.
Yields Message objects containing incremental responses. If nodes return streaming responses, yields them directly. If nodes return complete responses, simulates streaming by chunking.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Input dict |
required |
|
dict[str, Any] | None
|
Configuration dictionary |
None
|
|
ResponseGranularity
|
Response parsing granularity |
LOW
|
Yields:
Type | Description |
---|---|
AsyncIterator[Message]
|
Message objects with incremental content |
Source code in pyagenity/graph/compiled_graph.py
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
generate_graph
¶
generate_graph()
Generate the graph representation.
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A dictionary representing the graph structure. |
Source code in pyagenity/graph/compiled_graph.py
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
|
invoke
¶
invoke(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph synchronously and return the final results.
Runs the complete graph workflow from start to finish, handling state management, node execution, and result formatting. This method automatically detects whether to start a fresh execution or resume from an interrupted state.
The execution is synchronous but internally uses async operations, making it suitable for use in non-async contexts while still benefiting from async capabilities for I/O operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Input dictionary for graph execution. For new executions, should contain 'messages' key with list of initial messages. For resumed executions, can contain additional data to merge. |
required |
|
dict[str, Any] | None
|
Optional configuration dictionary containing execution settings: - user_id: Identifier for the user/session - thread_id: Unique identifier for this execution thread - run_id: Unique identifier for this specific run - recursion_limit: Maximum steps before stopping (default: 25) |
None
|
|
ResponseGranularity
|
Level of detail in the response: - LOW: Returns only messages (default) - PARTIAL: Returns context, summary, and messages - FULL: Returns complete state and messages |
LOW
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary containing execution results formatted according to the |
dict[str, Any]
|
specified granularity level. Always includes execution messages |
dict[str, Any]
|
and may include additional state information. |
Raises:
Type | Description |
---|---|
ValueError
|
If input_data is invalid for new execution. |
GraphRecursionError
|
If execution exceeds recursion limit. |
Various exceptions
|
Depending on node execution failures. |
Example
# Basic execution
result = compiled.invoke({"messages": [Message.text_message("Process this data")]})
print(result["messages"]) # Final execution messages
# With configuration and full details
result = compiled.invoke(
input_data={"messages": [message]},
config={"user_id": "user123", "thread_id": "session456", "recursion_limit": 50},
response_granularity=ResponseGranularity.FULL,
)
print(result["state"]) # Complete final state
Note
This method uses asyncio.run() internally, so it should not be called from within an async context. Use ainvoke() instead for async execution.
Source code in pyagenity/graph/compiled_graph.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
|
stop
¶
stop(config)
Request the current graph execution to stop (sync helper).
This sets a stop flag in the checkpointer's thread store keyed by thread_id. Handlers periodically check this flag and interrupt execution. Returns a small status dict.
Source code in pyagenity/graph/compiled_graph.py
251 252 253 254 255 256 257 258 |
|
stream
¶
stream(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph synchronously with streaming support.
Yields Message objects containing incremental responses. If nodes return streaming responses, yields them directly. If nodes return complete responses, simulates streaming by chunking.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Input dict |
required |
|
dict[str, Any] | None
|
Configuration dictionary |
None
|
|
ResponseGranularity
|
Response parsing granularity |
LOW
|
Yields:
Type | Description |
---|---|
Generator[Message]
|
Message objects with incremental content |
Source code in pyagenity/graph/compiled_graph.py
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
|
Edge
¶
Represents a connection between two nodes in a graph workflow.
An Edge defines the relationship and routing logic between nodes, specifying how execution should flow from one node to another. Edges can be either static (unconditional) or conditional based on runtime state evaluation.
Edges support complex routing scenarios including: - Simple static connections between nodes - Conditional routing based on state evaluation - Dynamic routing with multiple possible destinations - Decision trees and branching logic
Attributes:
Name | Type | Description |
---|---|---|
from_node |
Name of the source node where execution originates. |
|
to_node |
Name of the destination node where execution continues. |
|
condition |
Optional callable that determines if this edge should be followed. If None, the edge is always followed (static edge). |
|
condition_result |
str | None
|
Optional value to match against condition result for mapped conditional edges. |
Example
# Static edge - always followed
static_edge = Edge("start", "process")
# Conditional edge - followed only if condition returns True
def needs_approval(state):
return state.data.get("requires_approval", False)
conditional_edge = Edge("process", "approval", condition=needs_approval)
# Mapped conditional edge - follows based on specific condition result
def get_priority(state):
return state.data.get("priority", "normal")
high_priority_edge = Edge("triage", "urgent", condition=get_priority)
high_priority_edge.condition_result = "high"
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new Edge with source, destination, and optional condition. |
Source code in pyagenity/graph/edge.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
|
Attributes¶
Functions¶
__init__
¶
__init__(from_node, to_node, condition=None)
Initialize a new Edge with source, destination, and optional condition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
Name of the source node. Must match a node name in the graph. |
required |
|
str
|
Name of the destination node. Must match a node name in the graph or be a special constant like END. |
required |
|
Callable | None
|
Optional callable that takes an AgentState as argument and returns a value to determine if this edge should be followed. If None, this is a static edge that's always followed. |
None
|
Note
The condition function should be deterministic and side-effect free for predictable execution behavior. It receives the current AgentState and should return a boolean (for simple conditions) or a string/value (for mapped conditional routing).
Source code in pyagenity/graph/edge.py
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
|
Node
¶
Represents a node in the graph workflow.
A Node encapsulates a function or ToolNode that can be executed as part of a graph workflow. It handles dependency injection, parameter mapping, and execution context management.
The Node class supports both regular callable functions and ToolNode instances for handling tool-based operations. It automatically injects dependencies based on function signatures and provides legacy parameter support.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
Unique identifier for the node within the graph. |
func |
Union[Callable, ToolNode]
|
The function or ToolNode to execute. |
Example
def my_function(state, config): ... return {"result": "processed"} node = Node("processor", my_function) result = await node.execute(state, config)
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new Node instance with function and dependencies. |
execute |
Execute the node function with comprehensive context and callback support. |
stream |
Execute the node function with streaming output support. |
Source code in pyagenity/graph/node.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
Attributes¶
Functions¶
__init__
¶
__init__(name, func, publisher=Inject[BasePublisher])
Initialize a new Node instance with function and dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
Unique identifier for the node within the graph. This name is used for routing, logging, and referencing the node in graph configuration. |
required |
|
Union[Callable, ToolNode]
|
The function or ToolNode to execute when this node is called. Functions should accept at least 'state' and 'config' parameters. ToolNode instances handle tool-based operations and provide their own execution logic. |
required |
|
BasePublisher | None
|
Optional event publisher for execution monitoring. Injected via dependency injection if not explicitly provided. Used for publishing node execution events and status updates. |
Inject[BasePublisher]
|
Note
The function signature is automatically analyzed to determine required parameters and dependency injection points. Parameters matching injectable service names will be automatically provided by the framework during execution.
Source code in pyagenity/graph/node.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
execute
async
¶
execute(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with comprehensive context and callback support.
Executes the node's function or ToolNode with full dependency injection, callback hook execution, and error handling. This method provides the complete execution environment including state access, configuration, and injected services.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Configuration dictionary containing execution context, user settings, thread identification, and runtime parameters. |
required |
|
AgentState
|
Current AgentState providing workflow context, message history, and shared state information accessible to the node function. |
required |
|
CallbackManager
|
Callback manager for executing pre/post execution hooks. Injected via dependency injection if not explicitly provided. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
dict[str, Any] | list[Message]
|
Either a dictionary containing updated state and execution results, |
dict[str, Any] | list[Message]
|
or a list of Message objects representing the node's output. |
dict[str, Any] | list[Message]
|
The return type depends on the node function's implementation. |
Example
# Node function that returns messages
def process_data(state, config):
result = process(state.data)
return [Message.text_message(f"Processed: {result}")]
node = Node("processor", process_data)
messages = await node.execute(config, state)
Note
The node function receives dependency-injected parameters based on its signature. Common injectable parameters include 'state', 'config', 'context_manager', 'publisher', and other framework services.
Source code in pyagenity/graph/node.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
|
stream
async
¶
stream(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with streaming output support.
Similar to execute() but designed for streaming scenarios where the node function can produce incremental results. This method provides an async iterator interface over the node's outputs, allowing for real-time processing and response streaming.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Configuration dictionary with execution context and settings. |
required |
|
AgentState
|
Current AgentState providing workflow context and shared state. |
required |
|
CallbackManager
|
Callback manager for pre/post execution hook handling. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncIterable[dict[str, Any] | Message]
|
Dictionary objects or Message instances representing incremental |
AsyncIterable[dict[str, Any] | Message]
|
outputs from the node function. The exact type and frequency of |
AsyncIterable[dict[str, Any] | Message]
|
yields depends on the node function's streaming implementation. |
Example
async def streaming_processor(state, config):
for item in large_dataset:
result = process_item(item)
yield Message.text_message(f"Processed item: {result}")
node = Node("stream_processor", streaming_processor)
async for output in node.stream(config, state):
print(f"Streamed: {output.content}")
Note
Not all node functions support streaming. For non-streaming functions, this method will yield a single result equivalent to calling execute(). The streaming capability is determined by the node function's implementation.
Source code in pyagenity/graph/node.py
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
StateGraph
¶
Main graph class for orchestrating multi-agent workflows.
This class provides the core functionality for building and managing stateful agent workflows. It is similar to LangGraph's StateGraph integration with support for dependency injection.
The graph is generic over state types to support custom AgentState subclasses, allowing for type-safe state management throughout the workflow execution.
Attributes:
Name | Type | Description |
---|---|---|
state |
StateT
|
The current state of the graph workflow. |
nodes |
dict[str, Node]
|
Collection of nodes in the graph. |
edges |
list[Edge]
|
Collection of edges connecting nodes. |
entry_point |
str | None
|
Name of the starting node for execution. |
context_manager |
BaseContextManager[StateT] | None
|
Optional context manager for handling cross-node state operations. |
dependency_container |
DependencyContainer
|
Container for managing dependencies that can be injected into node functions. |
compiled |
bool
|
Whether the graph has been compiled for execution. |
Example
graph = StateGraph() graph.add_node("process", process_function) graph.add_edge(START, "process") graph.add_edge("process", END) compiled = graph.compile() result = compiled.invoke({"input": "data"})
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new StateGraph instance. |
add_conditional_edges |
Add conditional routing between nodes based on runtime evaluation. |
add_edge |
Add a static edge between two nodes. |
add_node |
Add a node to the graph. |
compile |
Compile the graph for execution. |
set_entry_point |
Set the entry point for the graph. |
Source code in pyagenity/graph/state_graph.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 |
|
Attributes¶
Functions¶
__init__
¶
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None, thread_name_generator=None)
Initialize a new StateGraph instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
StateT | None
|
Initial state for the graph. If None, a default AgentState will be created. |
None
|
|
BaseContextManager[StateT] | None
|
Optional context manager for handling cross-node state operations and advanced state management patterns. |
None
|
|
Container for managing dependencies that can be injected into node functions. If None, a new empty container will be created. |
required | |
|
BasePublisher | None
|
Publisher for emitting events during execution |
None
|
Note
START and END nodes are automatically added to the graph upon initialization and accept the full node signature including dependencies.
Example
Basic usage with default AgentState¶
graph = StateGraph()
With custom state¶
custom_state = MyCustomState() graph = StateGraph(custom_state)
Or using type hints for clarity¶
graph = StateGraphMyCustomState
Source code in pyagenity/graph/state_graph.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
|
add_conditional_edges
¶
add_conditional_edges(from_node, condition, path_map=None)
Add conditional routing between nodes based on runtime evaluation.
Creates dynamic routing logic where the next node is determined by evaluating a condition function against the current state. This enables complex branching logic, decision trees, and adaptive workflow routing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
Name of the source node where the condition is evaluated. |
required |
|
Callable
|
Callable function that takes the current AgentState and returns a value used for routing decisions. Should be deterministic and side-effect free. |
required |
|
dict[str, str] | None
|
Optional dictionary mapping condition results to destination nodes. If provided, the condition's return value is looked up in this mapping. If None, the condition should return the destination node name directly. |
None
|
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph instance for method chaining. |
Raises:
Type | Description |
---|---|
ValueError
|
If the condition function or path_map configuration is invalid. |
Example
# Direct routing - condition returns node name
def route_by_priority(state):
priority = state.data.get("priority", "normal")
return "urgent_handler" if priority == "high" else "normal_handler"
graph.add_conditional_edges("classifier", route_by_priority)
# Mapped routing - condition result mapped to nodes
def get_category(state):
return state.data.get("category", "default")
category_map = {
"finance": "finance_processor",
"legal": "legal_processor",
"default": "general_processor",
}
graph.add_conditional_edges("categorizer", get_category, category_map)
Note
The condition function receives the current AgentState and should return consistent results for the same state. If using path_map, ensure the condition's return values match the map keys exactly.
Source code in pyagenity/graph/state_graph.py
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
|
add_edge
¶
add_edge(from_node, to_node)
Add a static edge between two nodes.
Creates a direct connection from one node to another. If the source node is START, the target node becomes the entry point for the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
Name of the source node. |
required |
|
str
|
Name of the target node. |
required |
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph instance for method chaining. |
Example
graph.add_edge("node1", "node2") graph.add_edge(START, "entry_node") # Sets entry point
Source code in pyagenity/graph/state_graph.py
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
|
add_node
¶
add_node(name_or_func, func=None)
Add a node to the graph.
This method supports two calling patterns: 1. Pass a callable as the first argument (name inferred from function name) 2. Pass a name string and callable/ToolNode as separate arguments
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str | Callable
|
Either the node name (str) or a callable function. If callable, the function name will be used as the node name. |
required |
|
Union[Callable, ToolNode, None]
|
The function or ToolNode to execute. Required if name_or_func is a string, ignored if name_or_func is callable. |
None
|
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph instance for method chaining. |
Raises:
Type | Description |
---|---|
ValueError
|
If invalid arguments are provided. |
Example
Method 1: Function name inferred¶
graph.add_node(my_function)
Method 2: Explicit name and function¶
graph.add_node("process", my_function)
Source code in pyagenity/graph/state_graph.py
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
|
compile
¶
compile(checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Compile the graph for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
BaseCheckpointer[StateT] | None
|
Checkpointer for state persistence |
None
|
|
BaseStore | None
|
Store for additional data |
None
|
|
Enable debug mode |
required | |
|
list[str] | None
|
List of node names to interrupt before execution |
None
|
|
list[str] | None
|
List of node names to interrupt after execution |
None
|
|
CallbackManager
|
Callback manager for executing hooks |
CallbackManager()
|
Source code in pyagenity/graph/state_graph.py
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 |
|
set_entry_point
¶
set_entry_point(node_name)
Set the entry point for the graph.
Source code in pyagenity/graph/state_graph.py
381 382 383 384 385 386 |
|
ToolNode
¶
Bases: SchemaMixin
, LocalExecMixin
, MCPMixin
, ComposioMixin
, LangChainMixin
, KwargsResolverMixin
A unified registry and executor for callable functions from various tool providers.
ToolNode serves as the central hub for managing and executing tools from multiple sources: - Local Python functions - MCP (Model Context Protocol) tools - Composio adapter tools - LangChain tools
The class uses a mixin-based architecture to separate concerns and maintain clean integration with different tool providers. It provides both synchronous and asynchronous execution methods with comprehensive event publishing and error handling.
Attributes:
Name | Type | Description |
---|---|---|
_funcs |
dict[str, Callable]
|
Dictionary mapping function names to callable functions. |
_client |
Client | None
|
Optional MCP client for remote tool execution. |
_composio |
ComposioAdapter | None
|
Optional Composio adapter for external integrations. |
_langchain |
Any | None
|
Optional LangChain adapter for LangChain tools. |
mcp_tools |
list[str]
|
List of available MCP tool names. |
composio_tools |
list[str]
|
List of available Composio tool names. |
langchain_tools |
list[str]
|
List of available LangChain tool names. |
Example
# Define local tools
def weather_tool(location: str) -> str:
return f"Weather in {location}: Sunny, 25°C"
def calculator(a: int, b: int) -> int:
return a + b
# Create ToolNode with local functions
tools = ToolNode([weather_tool, calculator])
# Execute a tool
result = await tools.invoke(
name="weather_tool",
args={"location": "New York"},
tool_call_id="call_123",
config={"user_id": "user1"},
state=agent_state,
)
Methods:
Name | Description |
---|---|
__init__ |
Initialize ToolNode with functions and optional tool adapters. |
all_tools |
Get all available tools from all configured providers. |
all_tools_sync |
Synchronously get all available tools from all configured providers. |
get_local_tool |
Generate OpenAI-compatible tool definitions for all registered local functions. |
invoke |
Execute a specific tool by name with the provided arguments. |
stream |
Execute a tool with streaming support, yielding incremental results. |
Source code in pyagenity/graph/tool_node/base.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
|
Attributes¶
Functions¶
__init__
¶
__init__(functions, client=None, composio_adapter=None, langchain_adapter=None)
Initialize ToolNode with functions and optional tool adapters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
Iterable[Callable]
|
Iterable of callable functions to register as tools. Each function
will be registered with its |
required |
|
Client | None
|
Optional MCP (Model Context Protocol) client for remote tool access. Requires 'fastmcp' and 'mcp' packages to be installed. |
None
|
|
ComposioAdapter | None
|
Optional Composio adapter for external integrations and third-party API access. |
None
|
|
Any | None
|
Optional LangChain adapter for accessing LangChain tools and integrations. |
None
|
Raises:
Type | Description |
---|---|
ImportError
|
If MCP client is provided but required packages are not installed. |
TypeError
|
If any item in functions is not callable. |
Note
When using MCP client functionality, ensure you have installed the required
dependencies with: pip install pyagenity[mcp]
Source code in pyagenity/graph/tool_node/base.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
|
all_tools
async
¶
all_tools()
Get all available tools from all configured providers.
Retrieves and combines tool definitions from local functions, MCP client, Composio adapter, and LangChain adapter. Each tool definition includes the function schema with parameters and descriptions.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each dict |
list[dict]
|
contains 'type': 'function' and 'function' with name, description, |
list[dict]
|
and parameters schema. |
Example
tools = await tool_node.all_tools()
# Returns:
# [
# {
# "type": "function",
# "function": {
# "name": "weather_tool",
# "description": "Get weather information for a location",
# "parameters": {
# "type": "object",
# "properties": {
# "location": {"type": "string"}
# },
# "required": ["location"]
# }
# }
# }
# ]
Source code in pyagenity/graph/tool_node/base.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
|
all_tools_sync
¶
all_tools_sync()
Synchronously get all available tools from all configured providers.
This is a synchronous wrapper around the async all_tools() method. It uses asyncio.run() to handle async operations from MCP, Composio, and LangChain adapters.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. |
Note
Prefer using the async all_tools()
method when possible, especially
in async contexts, to avoid potential event loop issues.
Source code in pyagenity/graph/tool_node/base.py
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
|
get_local_tool
¶
get_local_tool()
Generate OpenAI-compatible tool definitions for all registered local functions.
Inspects all registered functions in _funcs and automatically generates tool schemas by analyzing function signatures, type annotations, and docstrings. Excludes injectable parameters that are provided by the framework.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each |
list[dict]
|
definition includes the function name, description (from docstring), |
list[dict]
|
and complete parameter schema with types and required fields. |
Example
For a function:
def calculate(a: int, b: int, operation: str = "add") -> int:
'''Perform arithmetic calculation.'''
return a + b if operation == "add" else a - b
Returns:
[
{
"type": "function",
"function": {
"name": "calculate",
"description": "Perform arithmetic calculation.",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"},
"operation": {"type": "string", "default": "add"},
},
"required": ["a", "b"],
},
},
}
]
Note
Parameters listed in INJECTABLE_PARAMS (like 'state', 'config', 'tool_call_id') are automatically excluded from the generated schema as they are provided by the framework during execution.
Source code in pyagenity/graph/tool_node/schema.py
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
invoke
async
¶
invoke(name, args, tool_call_id, config, state, callback_manager=Inject[CallbackManager])
Execute a specific tool by name with the provided arguments.
This method handles tool execution across all configured providers (local, MCP, Composio, LangChain) with comprehensive error handling, event publishing, and callback management.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
The name of the tool to execute. |
required |
|
dict
|
Dictionary of arguments to pass to the tool function. |
required |
|
str
|
Unique identifier for this tool execution, used for tracking and result correlation. |
required |
|
dict[str, Any]
|
Configuration dictionary containing execution context and user-specific settings. |
required |
|
AgentState
|
Current agent state for context-aware tool execution. |
required |
|
CallbackManager
|
Manager for executing pre/post execution callbacks. Injected via dependency injection if not provided. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
Any
|
Message object containing tool execution results, either successful |
Any
|
output or error information with appropriate status indicators. |
Example
result = await tool_node.invoke(
name="weather_tool",
args={"location": "Paris", "units": "metric"},
tool_call_id="call_abc123",
config={"user_id": "user1", "session_id": "session1"},
state=current_agent_state,
)
# result is a Message with tool execution results
print(result.content) # Tool output or error information
Note
The method publishes execution events throughout the process for monitoring and debugging purposes. Tool execution is routed based on tool provider precedence: MCP → Composio → LangChain → Local.
Source code in pyagenity/graph/tool_node/base.py
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
|
stream
async
¶
stream(name, args, tool_call_id, config, state, callback_manager=Inject[CallbackManager])
Execute a tool with streaming support, yielding incremental results.
Similar to invoke() but designed for tools that can provide streaming responses or when you want to process results as they become available. Currently, most tool providers return complete results, so this method typically yields a single Message with the full result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
The name of the tool to execute. |
required |
|
dict
|
Dictionary of arguments to pass to the tool function. |
required |
|
str
|
Unique identifier for this tool execution. |
required |
|
dict[str, Any]
|
Configuration dictionary containing execution context. |
required |
|
AgentState
|
Current agent state for context-aware tool execution. |
required |
|
CallbackManager
|
Manager for executing pre/post execution callbacks. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncIterator[Message]
|
Message objects containing tool execution results or status updates. |
AsyncIterator[Message]
|
For most tools, this will yield a single complete result Message. |
Example
async for message in tool_node.stream(
name="data_processor",
args={"dataset": "large_data.csv"},
tool_call_id="call_stream123",
config={"user_id": "user1"},
state=current_state,
):
print(f"Received: {message.content}")
# Process each streamed result
Note
The streaming interface is designed for future expansion where tools may provide true streaming responses. Currently, it provides a consistent async iterator interface over tool results.
Source code in pyagenity/graph/tool_node/base.py
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
|
Modules¶
compiled_graph
¶
Classes:
Name | Description |
---|---|
CompiledGraph |
A fully compiled and executable graph ready for workflow execution. |
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
CompiledGraph
¶
A fully compiled and executable graph ready for workflow execution.
CompiledGraph represents the final executable form of a StateGraph after compilation. It encapsulates all the execution logic, handlers, and services needed to run agent workflows. The graph supports both synchronous and asynchronous execution with comprehensive state management, checkpointing, event publishing, and streaming capabilities.
This class is generic over state types to support custom AgentState subclasses, ensuring type safety throughout the execution process.
Key Features: - Synchronous and asynchronous execution methods - Real-time streaming with incremental results - State persistence and checkpointing - Interrupt and resume capabilities - Event publishing for monitoring and debugging - Background task management - Graceful error handling and recovery
Attributes:
Name | Type | Description |
---|---|---|
_state |
The initial/template state for graph executions. |
|
_invoke_handler |
InvokeHandler[StateT]
|
Handler for non-streaming graph execution. |
_stream_handler |
StreamHandler[StateT]
|
Handler for streaming graph execution. |
_checkpointer |
BaseCheckpointer[StateT] | None
|
Optional state persistence backend. |
_publisher |
BasePublisher | None
|
Optional event publishing backend. |
_store |
BaseStore | None
|
Optional data storage backend. |
_state_graph |
StateGraph[StateT]
|
Reference to the source StateGraph. |
_interrupt_before |
list[str]
|
Nodes where execution should pause before execution. |
_interrupt_after |
list[str]
|
Nodes where execution should pause after execution. |
_task_manager |
Manager for background async tasks. |
Example
# After building and compiling a StateGraph
compiled = graph.compile()
# Synchronous execution
result = compiled.invoke({"messages": [Message.text_message("Hello")]})
# Asynchronous execution with streaming
async for chunk in compiled.astream({"messages": [message]}):
print(f"Streamed: {chunk.content}")
# Graceful cleanup
await compiled.aclose()
Note
CompiledGraph instances should be properly closed using aclose() to release resources like database connections, background tasks, and event publishers.
Methods:
Name | Description |
---|---|
__init__ |
|
aclose |
Close the graph and release any resources. |
ainvoke |
Execute the graph asynchronously. |
astop |
Request the current graph execution to stop (async). |
astream |
Execute the graph asynchronously with streaming support. |
generate_graph |
Generate the graph representation. |
invoke |
Execute the graph synchronously and return the final results. |
stop |
Request the current graph execution to stop (sync helper). |
stream |
Execute the graph synchronously with streaming support. |
Source code in pyagenity/graph/compiled_graph.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
|
Functions¶
__init__
¶__init__(state, checkpointer, publisher, store, state_graph, interrupt_before, interrupt_after, task_manager)
Source code in pyagenity/graph/compiled_graph.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
|
aclose
async
¶aclose()
Close the graph and release any resources.
Source code in pyagenity/graph/compiled_graph.py
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
|
ainvoke
async
¶ainvoke(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously.
Auto-detects whether to start fresh execution or resume from interrupted state based on the AgentState's execution metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
¶ |
dict[str, Any]
|
Input dict with 'messages' key (for new execution) or additional data for resuming |
required |
config
¶ |
dict[str, Any] | None
|
Configuration dictionary |
None
|
response_granularity
¶ |
ResponseGranularity
|
Response parsing granularity |
LOW
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Response dict based on granularity |
Source code in pyagenity/graph/compiled_graph.py
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
|
astop
async
¶astop(config)
Request the current graph execution to stop (async).
Contract: - Requires a valid thread_id in config - If no active thread or no checkpointer, returns not-running - If state exists and is running, set stop_requested flag in thread info
Source code in pyagenity/graph/compiled_graph.py
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 |
|
astream
async
¶astream(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously with streaming support.
Yields Message objects containing incremental responses. If nodes return streaming responses, yields them directly. If nodes return complete responses, simulates streaming by chunking.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
¶ |
dict[str, Any]
|
Input dict |
required |
config
¶ |
dict[str, Any] | None
|
Configuration dictionary |
None
|
response_granularity
¶ |
ResponseGranularity
|
Response parsing granularity |
LOW
|
Yields:
Type | Description |
---|---|
AsyncIterator[Message]
|
Message objects with incremental content |
Source code in pyagenity/graph/compiled_graph.py
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
|
generate_graph
¶generate_graph()
Generate the graph representation.
Returns:
Type | Description |
---|---|
dict[str, Any]
|
A dictionary representing the graph structure. |
Source code in pyagenity/graph/compiled_graph.py
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
|
invoke
¶invoke(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph synchronously and return the final results.
Runs the complete graph workflow from start to finish, handling state management, node execution, and result formatting. This method automatically detects whether to start a fresh execution or resume from an interrupted state.
The execution is synchronous but internally uses async operations, making it suitable for use in non-async contexts while still benefiting from async capabilities for I/O operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
¶ |
dict[str, Any]
|
Input dictionary for graph execution. For new executions, should contain 'messages' key with list of initial messages. For resumed executions, can contain additional data to merge. |
required |
config
¶ |
dict[str, Any] | None
|
Optional configuration dictionary containing execution settings: - user_id: Identifier for the user/session - thread_id: Unique identifier for this execution thread - run_id: Unique identifier for this specific run - recursion_limit: Maximum steps before stopping (default: 25) |
None
|
response_granularity
¶ |
ResponseGranularity
|
Level of detail in the response: - LOW: Returns only messages (default) - PARTIAL: Returns context, summary, and messages - FULL: Returns complete state and messages |
LOW
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary containing execution results formatted according to the |
dict[str, Any]
|
specified granularity level. Always includes execution messages |
dict[str, Any]
|
and may include additional state information. |
Raises:
Type | Description |
---|---|
ValueError
|
If input_data is invalid for new execution. |
GraphRecursionError
|
If execution exceeds recursion limit. |
Various exceptions
|
Depending on node execution failures. |
Example
# Basic execution
result = compiled.invoke({"messages": [Message.text_message("Process this data")]})
print(result["messages"]) # Final execution messages
# With configuration and full details
result = compiled.invoke(
input_data={"messages": [message]},
config={"user_id": "user123", "thread_id": "session456", "recursion_limit": 50},
response_granularity=ResponseGranularity.FULL,
)
print(result["state"]) # Complete final state
Note
This method uses asyncio.run() internally, so it should not be called from within an async context. Use ainvoke() instead for async execution.
Source code in pyagenity/graph/compiled_graph.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
|
stop
¶stop(config)
Request the current graph execution to stop (sync helper).
This sets a stop flag in the checkpointer's thread store keyed by thread_id. Handlers periodically check this flag and interrupt execution. Returns a small status dict.
Source code in pyagenity/graph/compiled_graph.py
251 252 253 254 255 256 257 258 |
|
stream
¶stream(input_data, config=None, response_granularity=ResponseGranularity.LOW)
Execute the graph synchronously with streaming support.
Yields Message objects containing incremental responses. If nodes return streaming responses, yields them directly. If nodes return complete responses, simulates streaming by chunking.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
¶ |
dict[str, Any]
|
Input dict |
required |
config
¶ |
dict[str, Any] | None
|
Configuration dictionary |
None
|
response_granularity
¶ |
ResponseGranularity
|
Response parsing granularity |
LOW
|
Yields:
Type | Description |
---|---|
Generator[Message]
|
Message objects with incremental content |
Source code in pyagenity/graph/compiled_graph.py
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
|
edge
¶
Graph edge representation and routing logic for PyAgenity workflows.
This module defines the Edge class, which represents connections between nodes in a PyAgenity graph workflow. Edges can be either static (always followed) or conditional (followed only when certain conditions are met), enabling complex routing logic and decision-making within graph execution.
Edges are fundamental building blocks that define the flow of execution through a graph, determining which node should execute next based on the current state and any conditional logic.
Classes:
Name | Description |
---|---|
Edge |
Represents a connection between two nodes in a graph workflow. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
Edge
¶
Represents a connection between two nodes in a graph workflow.
An Edge defines the relationship and routing logic between nodes, specifying how execution should flow from one node to another. Edges can be either static (unconditional) or conditional based on runtime state evaluation.
Edges support complex routing scenarios including: - Simple static connections between nodes - Conditional routing based on state evaluation - Dynamic routing with multiple possible destinations - Decision trees and branching logic
Attributes:
Name | Type | Description |
---|---|---|
from_node |
Name of the source node where execution originates. |
|
to_node |
Name of the destination node where execution continues. |
|
condition |
Optional callable that determines if this edge should be followed. If None, the edge is always followed (static edge). |
|
condition_result |
str | None
|
Optional value to match against condition result for mapped conditional edges. |
Example
# Static edge - always followed
static_edge = Edge("start", "process")
# Conditional edge - followed only if condition returns True
def needs_approval(state):
return state.data.get("requires_approval", False)
conditional_edge = Edge("process", "approval", condition=needs_approval)
# Mapped conditional edge - follows based on specific condition result
def get_priority(state):
return state.data.get("priority", "normal")
high_priority_edge = Edge("triage", "urgent", condition=get_priority)
high_priority_edge.condition_result = "high"
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new Edge with source, destination, and optional condition. |
Source code in pyagenity/graph/edge.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
|
Attributes¶
Functions¶
__init__
¶__init__(from_node, to_node, condition=None)
Initialize a new Edge with source, destination, and optional condition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_node
¶ |
str
|
Name of the source node. Must match a node name in the graph. |
required |
to_node
¶ |
str
|
Name of the destination node. Must match a node name in the graph or be a special constant like END. |
required |
condition
¶ |
Callable | None
|
Optional callable that takes an AgentState as argument and returns a value to determine if this edge should be followed. If None, this is a static edge that's always followed. |
None
|
Note
The condition function should be deterministic and side-effect free for predictable execution behavior. It receives the current AgentState and should return a boolean (for simple conditions) or a string/value (for mapped conditional routing).
Source code in pyagenity/graph/edge.py
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
|
node
¶
Node execution and management for PyAgenity graph workflows.
This module defines the Node class, which represents executable units within a PyAgenity graph workflow. Nodes encapsulate functions or ToolNode instances that perform specific tasks, handle dependency injection, manage execution context, and support both synchronous and streaming execution modes.
Nodes are the fundamental building blocks of graph workflows, responsible for processing state, executing business logic, and producing outputs that drive the workflow forward. They integrate seamlessly with PyAgenity's dependency injection system and callback management framework.
Classes:
Name | Description |
---|---|
Node |
Represents a node in the graph workflow. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
Node
¶
Represents a node in the graph workflow.
A Node encapsulates a function or ToolNode that can be executed as part of a graph workflow. It handles dependency injection, parameter mapping, and execution context management.
The Node class supports both regular callable functions and ToolNode instances for handling tool-based operations. It automatically injects dependencies based on function signatures and provides legacy parameter support.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
Unique identifier for the node within the graph. |
func |
Union[Callable, ToolNode]
|
The function or ToolNode to execute. |
Example
def my_function(state, config): ... return {"result": "processed"} node = Node("processor", my_function) result = await node.execute(state, config)
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new Node instance with function and dependencies. |
execute |
Execute the node function with comprehensive context and callback support. |
stream |
Execute the node function with streaming output support. |
Source code in pyagenity/graph/node.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
Attributes¶
Functions¶
__init__
¶__init__(name, func, publisher=Inject[BasePublisher])
Initialize a new Node instance with function and dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
Unique identifier for the node within the graph. This name is used for routing, logging, and referencing the node in graph configuration. |
required |
func
¶ |
Union[Callable, ToolNode]
|
The function or ToolNode to execute when this node is called. Functions should accept at least 'state' and 'config' parameters. ToolNode instances handle tool-based operations and provide their own execution logic. |
required |
publisher
¶ |
BasePublisher | None
|
Optional event publisher for execution monitoring. Injected via dependency injection if not explicitly provided. Used for publishing node execution events and status updates. |
Inject[BasePublisher]
|
Note
The function signature is automatically analyzed to determine required parameters and dependency injection points. Parameters matching injectable service names will be automatically provided by the framework during execution.
Source code in pyagenity/graph/node.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
execute
async
¶execute(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with comprehensive context and callback support.
Executes the node's function or ToolNode with full dependency injection, callback hook execution, and error handling. This method provides the complete execution environment including state access, configuration, and injected services.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context, user settings, thread identification, and runtime parameters. |
required |
state
¶ |
AgentState
|
Current AgentState providing workflow context, message history, and shared state information accessible to the node function. |
required |
callback_mgr
¶ |
CallbackManager
|
Callback manager for executing pre/post execution hooks. Injected via dependency injection if not explicitly provided. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
dict[str, Any] | list[Message]
|
Either a dictionary containing updated state and execution results, |
dict[str, Any] | list[Message]
|
or a list of Message objects representing the node's output. |
dict[str, Any] | list[Message]
|
The return type depends on the node function's implementation. |
Example
# Node function that returns messages
def process_data(state, config):
result = process(state.data)
return [Message.text_message(f"Processed: {result}")]
node = Node("processor", process_data)
messages = await node.execute(config, state)
Note
The node function receives dependency-injected parameters based on its signature. Common injectable parameters include 'state', 'config', 'context_manager', 'publisher', and other framework services.
Source code in pyagenity/graph/node.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
|
stream
async
¶stream(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with streaming output support.
Similar to execute() but designed for streaming scenarios where the node function can produce incremental results. This method provides an async iterator interface over the node's outputs, allowing for real-time processing and response streaming.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any]
|
Configuration dictionary with execution context and settings. |
required |
state
¶ |
AgentState
|
Current AgentState providing workflow context and shared state. |
required |
callback_mgr
¶ |
CallbackManager
|
Callback manager for pre/post execution hook handling. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncIterable[dict[str, Any] | Message]
|
Dictionary objects or Message instances representing incremental |
AsyncIterable[dict[str, Any] | Message]
|
outputs from the node function. The exact type and frequency of |
AsyncIterable[dict[str, Any] | Message]
|
yields depends on the node function's streaming implementation. |
Example
async def streaming_processor(state, config):
for item in large_dataset:
result = process_item(item)
yield Message.text_message(f"Processed item: {result}")
node = Node("stream_processor", streaming_processor)
async for output in node.stream(config, state):
print(f"Streamed: {output.content}")
Note
Not all node functions support streaming. For non-streaming functions, this method will yield a single result equivalent to calling execute(). The streaming capability is determined by the node function's implementation.
Source code in pyagenity/graph/node.py
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
state_graph
¶
Classes:
Name | Description |
---|---|
StateGraph |
Main graph class for orchestrating multi-agent workflows. |
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
StateGraph
¶
Main graph class for orchestrating multi-agent workflows.
This class provides the core functionality for building and managing stateful agent workflows. It is similar to LangGraph's StateGraph integration with support for dependency injection.
The graph is generic over state types to support custom AgentState subclasses, allowing for type-safe state management throughout the workflow execution.
Attributes:
Name | Type | Description |
---|---|---|
state |
StateT
|
The current state of the graph workflow. |
nodes |
dict[str, Node]
|
Collection of nodes in the graph. |
edges |
list[Edge]
|
Collection of edges connecting nodes. |
entry_point |
str | None
|
Name of the starting node for execution. |
context_manager |
BaseContextManager[StateT] | None
|
Optional context manager for handling cross-node state operations. |
dependency_container |
DependencyContainer
|
Container for managing dependencies that can be injected into node functions. |
compiled |
bool
|
Whether the graph has been compiled for execution. |
Example
graph = StateGraph() graph.add_node("process", process_function) graph.add_edge(START, "process") graph.add_edge("process", END) compiled = graph.compile() result = compiled.invoke({"input": "data"})
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new StateGraph instance. |
add_conditional_edges |
Add conditional routing between nodes based on runtime evaluation. |
add_edge |
Add a static edge between two nodes. |
add_node |
Add a node to the graph. |
compile |
Compile the graph for execution. |
set_entry_point |
Set the entry point for the graph. |
Source code in pyagenity/graph/state_graph.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 |
|
Attributes¶
Functions¶
__init__
¶__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None, thread_name_generator=None)
Initialize a new StateGraph instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state
¶ |
StateT | None
|
Initial state for the graph. If None, a default AgentState will be created. |
None
|
context_manager
¶ |
BaseContextManager[StateT] | None
|
Optional context manager for handling cross-node state operations and advanced state management patterns. |
None
|
dependency_container
¶ |
Container for managing dependencies that can be injected into node functions. If None, a new empty container will be created. |
required | |
publisher
¶ |
BasePublisher | None
|
Publisher for emitting events during execution |
None
|
Note
START and END nodes are automatically added to the graph upon initialization and accept the full node signature including dependencies.
Example
Basic usage with default AgentState¶
graph = StateGraph()
With custom state¶
custom_state = MyCustomState() graph = StateGraph(custom_state)
Or using type hints for clarity¶
graph = StateGraphMyCustomState
Source code in pyagenity/graph/state_graph.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
|
add_conditional_edges
¶add_conditional_edges(from_node, condition, path_map=None)
Add conditional routing between nodes based on runtime evaluation.
Creates dynamic routing logic where the next node is determined by evaluating a condition function against the current state. This enables complex branching logic, decision trees, and adaptive workflow routing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_node
¶ |
str
|
Name of the source node where the condition is evaluated. |
required |
condition
¶ |
Callable
|
Callable function that takes the current AgentState and returns a value used for routing decisions. Should be deterministic and side-effect free. |
required |
path_map
¶ |
dict[str, str] | None
|
Optional dictionary mapping condition results to destination nodes. If provided, the condition's return value is looked up in this mapping. If None, the condition should return the destination node name directly. |
None
|
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph instance for method chaining. |
Raises:
Type | Description |
---|---|
ValueError
|
If the condition function or path_map configuration is invalid. |
Example
# Direct routing - condition returns node name
def route_by_priority(state):
priority = state.data.get("priority", "normal")
return "urgent_handler" if priority == "high" else "normal_handler"
graph.add_conditional_edges("classifier", route_by_priority)
# Mapped routing - condition result mapped to nodes
def get_category(state):
return state.data.get("category", "default")
category_map = {
"finance": "finance_processor",
"legal": "legal_processor",
"default": "general_processor",
}
graph.add_conditional_edges("categorizer", get_category, category_map)
Note
The condition function receives the current AgentState and should return consistent results for the same state. If using path_map, ensure the condition's return values match the map keys exactly.
Source code in pyagenity/graph/state_graph.py
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
|
add_edge
¶add_edge(from_node, to_node)
Add a static edge between two nodes.
Creates a direct connection from one node to another. If the source node is START, the target node becomes the entry point for the graph.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_node
¶ |
str
|
Name of the source node. |
required |
to_node
¶ |
str
|
Name of the target node. |
required |
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph instance for method chaining. |
Example
graph.add_edge("node1", "node2") graph.add_edge(START, "entry_node") # Sets entry point
Source code in pyagenity/graph/state_graph.py
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
|
add_node
¶add_node(name_or_func, func=None)
Add a node to the graph.
This method supports two calling patterns: 1. Pass a callable as the first argument (name inferred from function name) 2. Pass a name string and callable/ToolNode as separate arguments
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name_or_func
¶ |
str | Callable
|
Either the node name (str) or a callable function. If callable, the function name will be used as the node name. |
required |
func
¶ |
Union[Callable, ToolNode, None]
|
The function or ToolNode to execute. Required if name_or_func is a string, ignored if name_or_func is callable. |
None
|
Returns:
Name | Type | Description |
---|---|---|
StateGraph |
StateGraph
|
The graph instance for method chaining. |
Raises:
Type | Description |
---|---|
ValueError
|
If invalid arguments are provided. |
Example
Method 1: Function name inferred¶
graph.add_node(my_function)
Method 2: Explicit name and function¶
graph.add_node("process", my_function)
Source code in pyagenity/graph/state_graph.py
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
|
compile
¶compile(checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Compile the graph for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
checkpointer
¶ |
BaseCheckpointer[StateT] | None
|
Checkpointer for state persistence |
None
|
store
¶ |
BaseStore | None
|
Store for additional data |
None
|
debug
¶ |
Enable debug mode |
required | |
interrupt_before
¶ |
list[str] | None
|
List of node names to interrupt before execution |
None
|
interrupt_after
¶ |
list[str] | None
|
List of node names to interrupt after execution |
None
|
callback_manager
¶ |
CallbackManager
|
Callback manager for executing hooks |
CallbackManager()
|
Source code in pyagenity/graph/state_graph.py
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 |
|
set_entry_point
¶set_entry_point(node_name)
Set the entry point for the graph.
Source code in pyagenity/graph/state_graph.py
381 382 383 384 385 386 |
|
Functions¶
tool_node
¶
ToolNode package.
This package provides a modularized implementation of ToolNode. Public API:
- ToolNode
- HAS_FASTMCP, HAS_MCP
Backwards-compatible import path: from pyagenity.graph.tool_node import ToolNode
Modules:
Name | Description |
---|---|
base |
Tool execution node for PyAgenity graph workflows. |
constants |
Constants for ToolNode package. |
deps |
Dependency flags and optional imports for ToolNode. |
executors |
Executors for different tool providers and local functions. |
schema |
Schema utilities and local tool description building for ToolNode. |
Classes:
Name | Description |
---|---|
ToolNode |
A unified registry and executor for callable functions from various tool providers. |
Attributes:
Name | Type | Description |
---|---|---|
HAS_FASTMCP |
|
|
HAS_MCP |
|
Attributes¶
Classes¶
ToolNode
¶
Bases: SchemaMixin
, LocalExecMixin
, MCPMixin
, ComposioMixin
, LangChainMixin
, KwargsResolverMixin
A unified registry and executor for callable functions from various tool providers.
ToolNode serves as the central hub for managing and executing tools from multiple sources: - Local Python functions - MCP (Model Context Protocol) tools - Composio adapter tools - LangChain tools
The class uses a mixin-based architecture to separate concerns and maintain clean integration with different tool providers. It provides both synchronous and asynchronous execution methods with comprehensive event publishing and error handling.
Attributes:
Name | Type | Description |
---|---|---|
_funcs |
dict[str, Callable]
|
Dictionary mapping function names to callable functions. |
_client |
Client | None
|
Optional MCP client for remote tool execution. |
_composio |
ComposioAdapter | None
|
Optional Composio adapter for external integrations. |
_langchain |
Any | None
|
Optional LangChain adapter for LangChain tools. |
mcp_tools |
list[str]
|
List of available MCP tool names. |
composio_tools |
list[str]
|
List of available Composio tool names. |
langchain_tools |
list[str]
|
List of available LangChain tool names. |
Example
# Define local tools
def weather_tool(location: str) -> str:
return f"Weather in {location}: Sunny, 25°C"
def calculator(a: int, b: int) -> int:
return a + b
# Create ToolNode with local functions
tools = ToolNode([weather_tool, calculator])
# Execute a tool
result = await tools.invoke(
name="weather_tool",
args={"location": "New York"},
tool_call_id="call_123",
config={"user_id": "user1"},
state=agent_state,
)
Methods:
Name | Description |
---|---|
__init__ |
Initialize ToolNode with functions and optional tool adapters. |
all_tools |
Get all available tools from all configured providers. |
all_tools_sync |
Synchronously get all available tools from all configured providers. |
get_local_tool |
Generate OpenAI-compatible tool definitions for all registered local functions. |
invoke |
Execute a specific tool by name with the provided arguments. |
stream |
Execute a tool with streaming support, yielding incremental results. |
Source code in pyagenity/graph/tool_node/base.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
|
Attributes¶
Functions¶
__init__
¶__init__(functions, client=None, composio_adapter=None, langchain_adapter=None)
Initialize ToolNode with functions and optional tool adapters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
functions
¶ |
Iterable[Callable]
|
Iterable of callable functions to register as tools. Each function
will be registered with its |
required |
client
¶ |
Client | None
|
Optional MCP (Model Context Protocol) client for remote tool access. Requires 'fastmcp' and 'mcp' packages to be installed. |
None
|
composio_adapter
¶ |
ComposioAdapter | None
|
Optional Composio adapter for external integrations and third-party API access. |
None
|
langchain_adapter
¶ |
Any | None
|
Optional LangChain adapter for accessing LangChain tools and integrations. |
None
|
Raises:
Type | Description |
---|---|
ImportError
|
If MCP client is provided but required packages are not installed. |
TypeError
|
If any item in functions is not callable. |
Note
When using MCP client functionality, ensure you have installed the required
dependencies with: pip install pyagenity[mcp]
Source code in pyagenity/graph/tool_node/base.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
|
all_tools
async
¶all_tools()
Get all available tools from all configured providers.
Retrieves and combines tool definitions from local functions, MCP client, Composio adapter, and LangChain adapter. Each tool definition includes the function schema with parameters and descriptions.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each dict |
list[dict]
|
contains 'type': 'function' and 'function' with name, description, |
list[dict]
|
and parameters schema. |
Example
tools = await tool_node.all_tools()
# Returns:
# [
# {
# "type": "function",
# "function": {
# "name": "weather_tool",
# "description": "Get weather information for a location",
# "parameters": {
# "type": "object",
# "properties": {
# "location": {"type": "string"}
# },
# "required": ["location"]
# }
# }
# }
# ]
Source code in pyagenity/graph/tool_node/base.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
|
all_tools_sync
¶all_tools_sync()
Synchronously get all available tools from all configured providers.
This is a synchronous wrapper around the async all_tools() method. It uses asyncio.run() to handle async operations from MCP, Composio, and LangChain adapters.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. |
Note
Prefer using the async all_tools()
method when possible, especially
in async contexts, to avoid potential event loop issues.
Source code in pyagenity/graph/tool_node/base.py
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
|
get_local_tool
¶get_local_tool()
Generate OpenAI-compatible tool definitions for all registered local functions.
Inspects all registered functions in _funcs and automatically generates tool schemas by analyzing function signatures, type annotations, and docstrings. Excludes injectable parameters that are provided by the framework.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each |
list[dict]
|
definition includes the function name, description (from docstring), |
list[dict]
|
and complete parameter schema with types and required fields. |
Example
For a function:
def calculate(a: int, b: int, operation: str = "add") -> int:
'''Perform arithmetic calculation.'''
return a + b if operation == "add" else a - b
Returns:
[
{
"type": "function",
"function": {
"name": "calculate",
"description": "Perform arithmetic calculation.",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"},
"operation": {"type": "string", "default": "add"},
},
"required": ["a", "b"],
},
},
}
]
Note
Parameters listed in INJECTABLE_PARAMS (like 'state', 'config', 'tool_call_id') are automatically excluded from the generated schema as they are provided by the framework during execution.
Source code in pyagenity/graph/tool_node/schema.py
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
invoke
async
¶invoke(name, args, tool_call_id, config, state, callback_manager=Inject[CallbackManager])
Execute a specific tool by name with the provided arguments.
This method handles tool execution across all configured providers (local, MCP, Composio, LangChain) with comprehensive error handling, event publishing, and callback management.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
The name of the tool to execute. |
required |
args
¶ |
dict
|
Dictionary of arguments to pass to the tool function. |
required |
tool_call_id
¶ |
str
|
Unique identifier for this tool execution, used for tracking and result correlation. |
required |
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context and user-specific settings. |
required |
state
¶ |
AgentState
|
Current agent state for context-aware tool execution. |
required |
callback_manager
¶ |
CallbackManager
|
Manager for executing pre/post execution callbacks. Injected via dependency injection if not provided. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
Any
|
Message object containing tool execution results, either successful |
Any
|
output or error information with appropriate status indicators. |
Example
result = await tool_node.invoke(
name="weather_tool",
args={"location": "Paris", "units": "metric"},
tool_call_id="call_abc123",
config={"user_id": "user1", "session_id": "session1"},
state=current_agent_state,
)
# result is a Message with tool execution results
print(result.content) # Tool output or error information
Note
The method publishes execution events throughout the process for monitoring and debugging purposes. Tool execution is routed based on tool provider precedence: MCP → Composio → LangChain → Local.
Source code in pyagenity/graph/tool_node/base.py
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
|
stream
async
¶stream(name, args, tool_call_id, config, state, callback_manager=Inject[CallbackManager])
Execute a tool with streaming support, yielding incremental results.
Similar to invoke() but designed for tools that can provide streaming responses or when you want to process results as they become available. Currently, most tool providers return complete results, so this method typically yields a single Message with the full result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
The name of the tool to execute. |
required |
args
¶ |
dict
|
Dictionary of arguments to pass to the tool function. |
required |
tool_call_id
¶ |
str
|
Unique identifier for this tool execution. |
required |
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context. |
required |
state
¶ |
AgentState
|
Current agent state for context-aware tool execution. |
required |
callback_manager
¶ |
CallbackManager
|
Manager for executing pre/post execution callbacks. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncIterator[Message]
|
Message objects containing tool execution results or status updates. |
AsyncIterator[Message]
|
For most tools, this will yield a single complete result Message. |
Example
async for message in tool_node.stream(
name="data_processor",
args={"dataset": "large_data.csv"},
tool_call_id="call_stream123",
config={"user_id": "user1"},
state=current_state,
):
print(f"Received: {message.content}")
# Process each streamed result
Note
The streaming interface is designed for future expansion where tools may provide true streaming responses. Currently, it provides a consistent async iterator interface over tool results.
Source code in pyagenity/graph/tool_node/base.py
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
|
Modules¶
base
¶
Tool execution node for PyAgenity graph workflows.
This module provides the ToolNode class, which serves as a unified registry and executor for callable functions from various sources including local functions, MCP (Model Context Protocol) tools, Composio adapters, and LangChain tools. The ToolNode is designed with a modular architecture using mixins to handle different tool providers.
The ToolNode maintains compatibility with PyAgenity's dependency injection system and publishes execution events for monitoring and debugging purposes.
Typical usage example
def my_tool(query: str) -> str:
return f"Result for: {query}"
tools = ToolNode([my_tool])
result = await tools.invoke("my_tool", {"query": "test"}, "call_id", config, state)
Classes:
Name | Description |
---|---|
ToolNode |
A unified registry and executor for callable functions from various tool providers. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
ToolNode
¶
Bases: SchemaMixin
, LocalExecMixin
, MCPMixin
, ComposioMixin
, LangChainMixin
, KwargsResolverMixin
A unified registry and executor for callable functions from various tool providers.
ToolNode serves as the central hub for managing and executing tools from multiple sources: - Local Python functions - MCP (Model Context Protocol) tools - Composio adapter tools - LangChain tools
The class uses a mixin-based architecture to separate concerns and maintain clean integration with different tool providers. It provides both synchronous and asynchronous execution methods with comprehensive event publishing and error handling.
Attributes:
Name | Type | Description |
---|---|---|
_funcs |
dict[str, Callable]
|
Dictionary mapping function names to callable functions. |
_client |
Client | None
|
Optional MCP client for remote tool execution. |
_composio |
ComposioAdapter | None
|
Optional Composio adapter for external integrations. |
_langchain |
Any | None
|
Optional LangChain adapter for LangChain tools. |
mcp_tools |
list[str]
|
List of available MCP tool names. |
composio_tools |
list[str]
|
List of available Composio tool names. |
langchain_tools |
list[str]
|
List of available LangChain tool names. |
Example
# Define local tools
def weather_tool(location: str) -> str:
return f"Weather in {location}: Sunny, 25°C"
def calculator(a: int, b: int) -> int:
return a + b
# Create ToolNode with local functions
tools = ToolNode([weather_tool, calculator])
# Execute a tool
result = await tools.invoke(
name="weather_tool",
args={"location": "New York"},
tool_call_id="call_123",
config={"user_id": "user1"},
state=agent_state,
)
Methods:
Name | Description |
---|---|
__init__ |
Initialize ToolNode with functions and optional tool adapters. |
all_tools |
Get all available tools from all configured providers. |
all_tools_sync |
Synchronously get all available tools from all configured providers. |
get_local_tool |
Generate OpenAI-compatible tool definitions for all registered local functions. |
invoke |
Execute a specific tool by name with the provided arguments. |
stream |
Execute a tool with streaming support, yielding incremental results. |
Source code in pyagenity/graph/tool_node/base.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
|
__init__
¶__init__(functions, client=None, composio_adapter=None, langchain_adapter=None)
Initialize ToolNode with functions and optional tool adapters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
functions
¶ |
Iterable[Callable]
|
Iterable of callable functions to register as tools. Each function
will be registered with its |
required |
client
¶ |
Client | None
|
Optional MCP (Model Context Protocol) client for remote tool access. Requires 'fastmcp' and 'mcp' packages to be installed. |
None
|
composio_adapter
¶ |
ComposioAdapter | None
|
Optional Composio adapter for external integrations and third-party API access. |
None
|
langchain_adapter
¶ |
Any | None
|
Optional LangChain adapter for accessing LangChain tools and integrations. |
None
|
Raises:
Type | Description |
---|---|
ImportError
|
If MCP client is provided but required packages are not installed. |
TypeError
|
If any item in functions is not callable. |
Note
When using MCP client functionality, ensure you have installed the required
dependencies with: pip install pyagenity[mcp]
Source code in pyagenity/graph/tool_node/base.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
|
all_tools
async
¶all_tools()
Get all available tools from all configured providers.
Retrieves and combines tool definitions from local functions, MCP client, Composio adapter, and LangChain adapter. Each tool definition includes the function schema with parameters and descriptions.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each dict |
list[dict]
|
contains 'type': 'function' and 'function' with name, description, |
list[dict]
|
and parameters schema. |
Example
tools = await tool_node.all_tools()
# Returns:
# [
# {
# "type": "function",
# "function": {
# "name": "weather_tool",
# "description": "Get weather information for a location",
# "parameters": {
# "type": "object",
# "properties": {
# "location": {"type": "string"}
# },
# "required": ["location"]
# }
# }
# }
# ]
Source code in pyagenity/graph/tool_node/base.py
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
|
all_tools_sync
¶all_tools_sync()
Synchronously get all available tools from all configured providers.
This is a synchronous wrapper around the async all_tools() method. It uses asyncio.run() to handle async operations from MCP, Composio, and LangChain adapters.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. |
Note
Prefer using the async all_tools()
method when possible, especially
in async contexts, to avoid potential event loop issues.
Source code in pyagenity/graph/tool_node/base.py
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
|
get_local_tool
¶get_local_tool()
Generate OpenAI-compatible tool definitions for all registered local functions.
Inspects all registered functions in _funcs and automatically generates tool schemas by analyzing function signatures, type annotations, and docstrings. Excludes injectable parameters that are provided by the framework.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each |
list[dict]
|
definition includes the function name, description (from docstring), |
list[dict]
|
and complete parameter schema with types and required fields. |
Example
For a function:
def calculate(a: int, b: int, operation: str = "add") -> int:
'''Perform arithmetic calculation.'''
return a + b if operation == "add" else a - b
Returns:
[
{
"type": "function",
"function": {
"name": "calculate",
"description": "Perform arithmetic calculation.",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"},
"operation": {"type": "string", "default": "add"},
},
"required": ["a", "b"],
},
},
}
]
Note
Parameters listed in INJECTABLE_PARAMS (like 'state', 'config', 'tool_call_id') are automatically excluded from the generated schema as they are provided by the framework during execution.
Source code in pyagenity/graph/tool_node/schema.py
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
invoke
async
¶invoke(name, args, tool_call_id, config, state, callback_manager=Inject[CallbackManager])
Execute a specific tool by name with the provided arguments.
This method handles tool execution across all configured providers (local, MCP, Composio, LangChain) with comprehensive error handling, event publishing, and callback management.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
The name of the tool to execute. |
required |
args
¶ |
dict
|
Dictionary of arguments to pass to the tool function. |
required |
tool_call_id
¶ |
str
|
Unique identifier for this tool execution, used for tracking and result correlation. |
required |
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context and user-specific settings. |
required |
state
¶ |
AgentState
|
Current agent state for context-aware tool execution. |
required |
callback_manager
¶ |
CallbackManager
|
Manager for executing pre/post execution callbacks. Injected via dependency injection if not provided. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
Any
|
Message object containing tool execution results, either successful |
Any
|
output or error information with appropriate status indicators. |
Example
result = await tool_node.invoke(
name="weather_tool",
args={"location": "Paris", "units": "metric"},
tool_call_id="call_abc123",
config={"user_id": "user1", "session_id": "session1"},
state=current_agent_state,
)
# result is a Message with tool execution results
print(result.content) # Tool output or error information
Note
The method publishes execution events throughout the process for monitoring and debugging purposes. Tool execution is routed based on tool provider precedence: MCP → Composio → LangChain → Local.
Source code in pyagenity/graph/tool_node/base.py
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
|
stream
async
¶stream(name, args, tool_call_id, config, state, callback_manager=Inject[CallbackManager])
Execute a tool with streaming support, yielding incremental results.
Similar to invoke() but designed for tools that can provide streaming responses or when you want to process results as they become available. Currently, most tool providers return complete results, so this method typically yields a single Message with the full result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
The name of the tool to execute. |
required |
args
¶ |
dict
|
Dictionary of arguments to pass to the tool function. |
required |
tool_call_id
¶ |
str
|
Unique identifier for this tool execution. |
required |
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context. |
required |
state
¶ |
AgentState
|
Current agent state for context-aware tool execution. |
required |
callback_manager
¶ |
CallbackManager
|
Manager for executing pre/post execution callbacks. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncIterator[Message]
|
Message objects containing tool execution results or status updates. |
AsyncIterator[Message]
|
For most tools, this will yield a single complete result Message. |
Example
async for message in tool_node.stream(
name="data_processor",
args={"dataset": "large_data.csv"},
tool_call_id="call_stream123",
config={"user_id": "user1"},
state=current_state,
):
print(f"Received: {message.content}")
# Process each streamed result
Note
The streaming interface is designed for future expansion where tools may provide true streaming responses. Currently, it provides a consistent async iterator interface over tool results.
Source code in pyagenity/graph/tool_node/base.py
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
|
Functions¶
Modules¶
constants
¶
Constants for ToolNode package.
This module defines constants used throughout the ToolNode implementation, particularly parameter names that are automatically injected by the PyAgenity framework during tool execution. These parameters are excluded from tool schema generation since they are provided by the execution context.
The constants are separated into their own module to avoid circular imports and maintain a clean public API.
Parameter names that are automatically injected during tool execution.
These parameters are provided by the PyAgenity framework and should be excluded from tool schema generation. They represent execution context and framework services that are available to tool functions but not provided by the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
Unique identifier for the current tool execution. |
required | |
|
Current AgentState instance for context-aware execution. |
required | |
|
Configuration dictionary with execution settings. |
required | |
|
Framework-generated identifier for various purposes. |
required | |
|
BaseContextManager instance for cross-node operations. |
required | |
|
BasePublisher instance for event publishing. |
required | |
|
BaseCheckpointer instance for state persistence. |
required | |
|
BaseStore instance for data storage operations. |
required |
Note
Tool functions can declare these parameters in their signatures to receive the corresponding services, but they should not be included in the tool schema since they're not user-provided arguments.
Attributes:
Name | Type | Description |
---|---|---|
INJECTABLE_PARAMS |
|
deps
¶
Dependency flags and optional imports for ToolNode.
This module manages optional third-party dependencies for the ToolNode implementation, providing clean import handling and feature flags. It isolates optional imports to prevent ImportError cascades when optional dependencies are not installed.
The module handles two main optional dependency groups: 1. MCP (Model Context Protocol) support via 'fastmcp' and 'mcp' packages 2. Future extensibility for other optional tool providers
By centralizing optional imports here, other modules can safely import the flags and types without triggering ImportError exceptions, allowing graceful degradation when optional features are not available.
Typical usage
from .deps import HAS_FASTMCP, HAS_MCP, Client
if HAS_FASTMCP and HAS_MCP:
# Use MCP functionality
client = Client(...)
else:
# Graceful fallback or error message
client = None
FastMCP integration support.
Boolean flag indicating whether FastMCP is available.
True if 'fastmcp' package is installed and imports successfully.
FastMCP Client class for connecting to MCP servers.
None if FastMCP is not available.
Result type for MCP tool executions.
None if FastMCP is not available.
Attributes:
Name | Type | Description |
---|---|---|
HAS_FASTMCP |
|
|
HAS_MCP |
|
executors
¶
Executors for different tool providers and local functions.
Classes:
Name | Description |
---|---|
ComposioMixin |
|
KwargsResolverMixin |
|
LangChainMixin |
|
LocalExecMixin |
|
MCPMixin |
|
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
ComposioMixin
¶Attributes:
Name | Type | Description |
---|---|---|
composio_tools |
list[str]
|
|
Source code in pyagenity/graph/tool_node/executors.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
KwargsResolverMixin
¶Source code in pyagenity/graph/tool_node/executors.py
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 |
|
LangChainMixin
¶Attributes:
Name | Type | Description |
---|---|---|
langchain_tools |
list[str]
|
|
Source code in pyagenity/graph/tool_node/executors.py
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
|
LocalExecMixin
¶Source code in pyagenity/graph/tool_node/executors.py
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 |
|
MCPMixin
¶Attributes:
Name | Type | Description |
---|---|---|
mcp_tools |
list[str]
|
|
Source code in pyagenity/graph/tool_node/executors.py
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 |
|
Functions¶
schema
¶
Schema utilities and local tool description building for ToolNode.
This module provides the SchemaMixin class which handles automatic schema generation for local Python functions, converting their type annotations and signatures into OpenAI-compatible function schemas. It supports various Python types including primitives, Optional types, List types, and Literal enums.
The schema generation process inspects function signatures and converts them to JSON Schema format suitable for use with language models and function calling APIs.
Classes:
Name | Description |
---|---|
SchemaMixin |
Mixin providing schema generation and local tool description building. |
Attributes¶
Classes¶
SchemaMixin
¶Mixin providing schema generation and local tool description building.
This mixin provides functionality to automatically generate JSON Schema definitions from Python function signatures. It handles type annotation conversion, parameter analysis, and OpenAI-compatible function schema generation for local tools.
The mixin is designed to be used with ToolNode to automatically generate tool schemas without requiring manual schema definition for Python functions.
Attributes:
Name | Type | Description |
---|---|---|
_funcs |
dict[str, Callable]
|
Dictionary mapping function names to callable functions. This attribute is expected to be provided by the mixing class. |
Methods:
Name | Description |
---|---|
get_local_tool |
Generate OpenAI-compatible tool definitions for all registered local functions. |
Source code in pyagenity/graph/tool_node/schema.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
get_local_tool
¶get_local_tool()
Generate OpenAI-compatible tool definitions for all registered local functions.
Inspects all registered functions in _funcs and automatically generates tool schemas by analyzing function signatures, type annotations, and docstrings. Excludes injectable parameters that are provided by the framework.
Returns:
Type | Description |
---|---|
list[dict]
|
List of tool definitions in OpenAI function calling format. Each |
list[dict]
|
definition includes the function name, description (from docstring), |
list[dict]
|
and complete parameter schema with types and required fields. |
Example
For a function:
def calculate(a: int, b: int, operation: str = "add") -> int:
'''Perform arithmetic calculation.'''
return a + b if operation == "add" else a - b
Returns:
[
{
"type": "function",
"function": {
"name": "calculate",
"description": "Perform arithmetic calculation.",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "integer"},
"b": {"type": "integer"},
"operation": {"type": "string", "default": "add"},
},
"required": ["a", "b"],
},
},
}
]
Note
Parameters listed in INJECTABLE_PARAMS (like 'state', 'config', 'tool_call_id') are automatically excluded from the generated schema as they are provided by the framework during execution.
Source code in pyagenity/graph/tool_node/schema.py
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
utils
¶
Modules:
Name | Description |
---|---|
handler_mixins |
Shared mixins for graph and node handler classes. |
invoke_handler |
|
invoke_node_handler |
InvokeNodeHandler utilities for PyAgenity agent graph execution. |
stream_handler |
Streaming graph execution handler for PyAgenity workflows. |
stream_node_handler |
Streaming node handler for PyAgenity graph workflows. |
stream_utils |
Streaming utility functions for PyAgenity graph workflows. |
utils |
Core utility functions for graph execution and state management. |
Modules¶
handler_mixins
¶
Shared mixins for graph and node handler classes.
This module provides lightweight mixins that add common functionality to handler classes without changing their core runtime behavior. The mixins follow the composition pattern to keep responsibilities explicit and allow handlers to inherit only the capabilities they need.
The mixins provide structured logging, configuration management, and other cross-cutting concerns that are commonly needed across different handler types. By using mixins, the core handler logic remains focused while gaining these shared capabilities.
Typical usage
class MyHandler(BaseLoggingMixin, InterruptConfigMixin):
def __init__(self):
self._set_interrupts(["node1"], ["node2"])
self._log_start("Handler initialized")
Classes:
Name | Description |
---|---|
BaseLoggingMixin |
Provides structured logging helpers for handler classes. |
InterruptConfigMixin |
Manages interrupt configuration for graph-level execution handlers. |
Classes¶
BaseLoggingMixin
¶Provides structured logging helpers for handler classes.
This mixin adds consistent logging capabilities to handler classes without requiring them to manage logger instances directly. It automatically creates loggers based on the module name and provides convenience methods for common logging operations.
The mixin is designed to be lightweight and non-intrusive, adding only logging functionality without affecting the core behavior of the handler.
Attributes:
Name | Type | Description |
---|---|---|
_logger |
Logger
|
Cached logger instance for the handler class. |
Example
class MyHandler(BaseLoggingMixin):
def process(self):
self._log_start("Processing started")
try:
# Do work
self._log_debug("Work completed successfully")
except Exception as e:
self._log_error("Processing failed: %s", e)
Source code in pyagenity/graph/utils/handler_mixins.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
|
InterruptConfigMixin
¶Manages interrupt configuration for graph-level execution handlers.
This mixin provides functionality to store and manage interrupt points configuration for graph execution. Interrupts allow graph execution to be paused before or after specific nodes for debugging, human intervention, or checkpoint creation.
The mixin maintains separate lists for "before" and "after" interrupts, allowing fine-grained control over when graph execution should pause.
Attributes:
Name | Type | Description |
---|---|---|
interrupt_before |
list[str] | None
|
List of node names where execution should pause before node execution begins. |
interrupt_after |
list[str] | None
|
List of node names where execution should pause after node execution completes. |
Example
class GraphHandler(InterruptConfigMixin):
def __init__(self):
self._set_interrupts(
interrupt_before=["approval_node"], interrupt_after=["data_processing"]
)
Source code in pyagenity/graph/utils/handler_mixins.py
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
|
invoke_handler
¶
Classes:
Name | Description |
---|---|
InvokeHandler |
|
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
InvokeHandler
¶
Bases: BaseLoggingMixin
, InterruptConfigMixin
Methods:
Name | Description |
---|---|
__init__ |
|
invoke |
Execute the graph asynchronously with event publishing. |
Attributes:
Name | Type | Description |
---|---|---|
edges |
list[Edge]
|
|
interrupt_after |
|
|
interrupt_before |
|
|
nodes |
dict[str, Node]
|
|
Source code in pyagenity/graph/utils/invoke_handler.py
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
|
__init__
¶__init__(nodes, edges, interrupt_before=None, interrupt_after=None)
Source code in pyagenity/graph/utils/invoke_handler.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
invoke
async
¶invoke(input_data, config, default_state, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously with event publishing.
Source code in pyagenity/graph/utils/invoke_handler.py
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
|
Functions¶
invoke_node_handler
¶
InvokeNodeHandler utilities for PyAgenity agent graph execution.
This module provides the InvokeNodeHandler class, which manages the invocation of node functions and tool nodes within the agent graph. It supports dependency injection, callback hooks, event publishing, and error recovery for both regular and tool-based nodes.
Classes:
Name | Description |
---|---|
InvokeNodeHandler |
Handles execution of node functions and tool nodes with DI and callbacks. |
Usage
handler = InvokeNodeHandler(name, func, publisher) result = await handler.invoke(config, state)
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
InvokeNodeHandler
¶
Bases: BaseLoggingMixin
Handles invocation of node functions and tool nodes in the agent graph.
Supports dependency injection, callback hooks, event publishing, and error recovery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
Name of the node. |
required |
func
¶ |
Callable | ToolNode
|
The function or ToolNode to execute. |
required |
publisher
¶ |
BasePublisher
|
Event publisher for execution events. |
Inject[BasePublisher]
|
Methods:
Name | Description |
---|---|
__init__ |
|
clear_signature_cache |
Clear the function signature cache. Useful for testing or memory management. |
invoke |
Execute the node function or ToolNode with dependency injection and callback hooks. |
Attributes:
Name | Type | Description |
---|---|---|
func |
|
|
name |
|
|
publisher |
|
Source code in pyagenity/graph/utils/invoke_node_handler.py
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
|
__init__
¶__init__(name, func, publisher=Inject[BasePublisher])
Source code in pyagenity/graph/utils/invoke_node_handler.py
65 66 67 68 69 70 71 72 73 |
|
clear_signature_cache
classmethod
¶clear_signature_cache()
Clear the function signature cache. Useful for testing or memory management.
Source code in pyagenity/graph/utils/invoke_node_handler.py
60 61 62 63 |
|
invoke
async
¶invoke(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function or ToolNode with dependency injection and callback hooks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict
|
Node configuration. |
required |
state
¶ |
AgentState
|
Current agent state. |
required |
callback_mgr
¶ |
CallbackManager
|
Callback manager for hooks. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
dict[str, Any] | list[Message]
|
dict | list[Message]: Result of node execution (regular node or tool node). |
Raises:
Type | Description |
---|---|
NodeError
|
If execution fails or context is missing for tool nodes. |
Source code in pyagenity/graph/utils/invoke_node_handler.py
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
|
Functions¶
stream_handler
¶
Streaming graph execution handler for PyAgenity workflows.
This module provides the StreamHandler class, which manages the execution of graph workflows with support for streaming output, interrupts, state persistence, and event publishing. It enables incremental result processing, pause/resume capabilities, and robust error handling for agent workflows that require real-time or chunked responses.
Classes:
Name | Description |
---|---|
StreamHandler |
Handles streaming execution for graph workflows in PyAgenity. |
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
StreamHandler
¶
Bases: BaseLoggingMixin
, InterruptConfigMixin
Handles streaming execution for graph workflows in PyAgenity.
StreamHandler manages the execution of agent workflows as directed graphs, supporting streaming output, pause/resume via interrupts, state persistence, and event publishing for monitoring and debugging. It enables incremental result processing and robust error handling for complex agent workflows.
Attributes:
Name | Type | Description |
---|---|---|
nodes |
dict[str, Node]
|
Dictionary mapping node names to Node instances. |
edges |
list[Edge]
|
List of Edge instances defining graph connections and routing. |
interrupt_before |
List of node names where execution should pause before execution. |
|
interrupt_after |
List of node names where execution should pause after execution. |
Example
handler = StreamHandler(nodes, edges)
async for chunk in handler.stream(input_data, config, state):
print(chunk)
Methods:
Name | Description |
---|---|
__init__ |
|
stream |
Execute the graph asynchronously with streaming output. |
Source code in pyagenity/graph/utils/stream_handler.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
|
__init__
¶__init__(nodes, edges, interrupt_before=None, interrupt_after=None)
Source code in pyagenity/graph/utils/stream_handler.py
71 72 73 74 75 76 77 78 79 80 81 82 83 |
|
stream
async
¶stream(input_data, config, default_state, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously with streaming output.
Runs the graph workflow from start to finish, yielding incremental results as they become available. Automatically detects whether to start a fresh execution or resume from an interrupted state, supporting pause/resume and checkpointing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
¶ |
dict[str, Any]
|
Input dictionary for graph execution. For new executions, should contain 'messages' key with initial messages. For resumed executions, can contain additional data to merge. |
required |
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution settings and context. |
required |
default_state
¶ |
StateT
|
Initial or template AgentState for workflow execution. |
required |
response_granularity
¶ |
ResponseGranularity
|
Level of detail in the response (LOW, PARTIAL, FULL). |
LOW
|
Yields:
Type | Description |
---|---|
AsyncGenerator[Message]
|
Message objects representing incremental results from graph execution. |
AsyncGenerator[Message]
|
The exact type and frequency of yields depends on node implementations |
AsyncGenerator[Message]
|
and workflow configuration. |
Raises:
Type | Description |
---|---|
GraphRecursionError
|
If execution exceeds recursion limit. |
ValueError
|
If input_data is invalid for new execution. |
Various exceptions
|
Depending on node execution failures. |
Example
async for chunk in handler.stream(input_data, config, state):
print(chunk)
Source code in pyagenity/graph/utils/stream_handler.py
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
|
Functions¶
stream_node_handler
¶
Streaming node handler for PyAgenity graph workflows.
This module provides the StreamNodeHandler class, which manages the execution of graph nodes that support streaming output. It handles both regular function nodes and ToolNode instances, enabling incremental result processing, dependency injection, callback management, and event publishing.
StreamNodeHandler is a key component for enabling real-time, chunked, or incremental responses in agent workflows, supporting both synchronous and asynchronous execution patterns.
Classes:
Name | Description |
---|---|
StreamNodeHandler |
Handles streaming execution for graph nodes in PyAgenity workflows. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
StreamNodeHandler
¶
Bases: BaseLoggingMixin
Handles streaming execution for graph nodes in PyAgenity workflows.
StreamNodeHandler manages the execution of nodes that can produce streaming output, including both regular function nodes and ToolNode instances. It supports dependency injection, callback management, event publishing, and incremental result processing.
Attributes:
Name | Type | Description |
---|---|---|
name |
Unique identifier for the node within the graph. |
|
func |
The function or ToolNode to execute. Determines streaming behavior. |
Example
handler = StreamNodeHandler("process", process_function)
async for chunk in handler.stream(config, state):
print(chunk)
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new StreamNodeHandler instance. |
stream |
Execute the node function with streaming output and callback support. |
Source code in pyagenity/graph/utils/stream_node_handler.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
|
__init__
¶__init__(name, func)
Initialize a new StreamNodeHandler instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
Unique identifier for the node within the graph. |
required |
func
¶ |
Union[Callable, ToolNode]
|
The function or ToolNode to execute. Determines streaming behavior. |
required |
Source code in pyagenity/graph/utils/stream_node_handler.py
62 63 64 65 66 67 68 69 70 71 72 73 74 |
|
stream
async
¶stream(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with streaming output and callback support.
Handles both ToolNode and regular function nodes, yielding incremental results as they become available. Supports dependency injection, callback management, and event publishing for monitoring and debugging.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context and settings. |
required |
state
¶ |
AgentState
|
Current AgentState providing workflow context and shared state. |
required |
callback_mgr
¶ |
CallbackManager
|
Callback manager for pre/post execution hook handling. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncGenerator[dict[str, Any] | Message]
|
Dictionary objects or Message instances representing incremental outputs |
AsyncGenerator[dict[str, Any] | Message]
|
from the node function. The exact type and frequency of yields depends on |
AsyncGenerator[dict[str, Any] | Message]
|
the node function's streaming implementation. |
Raises:
Type | Description |
---|---|
NodeError
|
If node execution fails or encounters an error. |
Example
async for chunk in handler.stream(config, state):
print(chunk)
Source code in pyagenity/graph/utils/stream_node_handler.py
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
|
Functions¶
stream_utils
¶
Streaming utility functions for PyAgenity graph workflows.
This module provides helper functions for determining whether a result from a node or tool execution should be treated as non-streaming (i.e., a complete result) or processed incrementally as a stream. These utilities are used throughout the graph execution engine to support both synchronous and streaming workflows.
Functions:
Name | Description |
---|---|
check_non_streaming |
Determine if a result should be treated as non-streaming. |
Classes¶
Functions¶
check_non_streaming
¶check_non_streaming(result)
Determine if a result should be treated as non-streaming.
Checks whether the given result is a complete, non-streaming output (such as a list, dict, string, Message, or AgentState) or if it should be processed incrementally as a stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result
¶ |
The result object returned from a node or tool execution. Can be any type. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if the result is non-streaming and should be processed as a complete output; |
bool
|
False if the result should be handled as a stream. |
Example
check_non_streaming([Message.text_message("done")]) True check_non_streaming(Message.text_message("done")) True check_non_streaming({"choices": [...]}) True check_non_streaming("some text") True
Source code in pyagenity/graph/utils/stream_utils.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
|
utils
¶
Core utility functions for graph execution and state management.
This module provides essential utilities for PyAgenity graph execution, including state management, message processing, response formatting, and execution flow control. These functions handle the low-level operations that support graph workflow execution.
The utilities in this module are designed to work with PyAgenity's dependency injection system and provide consistent interfaces for common operations across different execution contexts.
Key functionality areas: - State loading, creation, and synchronization - Message processing and deduplication - Response formatting based on granularity levels - Node execution result processing - Interrupt handling and execution flow control
Functions:
Name | Description |
---|---|
call_realtime_sync |
Call the realtime state sync hook if provided. |
check_and_handle_interrupt |
Check for interrupts and save state if needed. Returns True if interrupted. |
get_next_node |
Get the next node to execute based on edges. |
load_or_create_state |
Load existing state from checkpointer or create new state. |
parse_response |
Parse and format execution response based on specified granularity level. |
process_node_result |
Processes the result from a node execution, updating the agent state, message list, |
reload_state |
Load existing state from checkpointer or create new state. |
sync_data |
Sync the current state and messages to the checkpointer. |
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
Functions¶
call_realtime_sync
async
¶call_realtime_sync(state, config, checkpointer=Inject[BaseCheckpointer])
Call the realtime state sync hook if provided.
Source code in pyagenity/graph/utils/utils.py
460 461 462 463 464 465 466 467 468 469 |
|
check_and_handle_interrupt
async
¶check_and_handle_interrupt(interrupt_before, interrupt_after, current_node, interrupt_type, state, config, _sync_data)
Check for interrupts and save state if needed. Returns True if interrupted.
Source code in pyagenity/graph/utils/utils.py
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
|
get_next_node
¶get_next_node(current_node, state, edges)
Get the next node to execute based on edges.
Source code in pyagenity/graph/utils/utils.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
|
load_or_create_state
async
¶load_or_create_state(input_data, config, old_state, checkpointer=Inject[BaseCheckpointer])
Load existing state from checkpointer or create new state.
Attempts to fetch a realtime-synced state first, then falls back to
the persistent checkpointer. If no existing state is found, creates
a new state from the StateGraph
's prototype state and merges any
incoming messages. Supports partial state update via 'state' in input_data.
Source code in pyagenity/graph/utils/utils.py
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
|
parse_response
async
¶parse_response(state, messages, response_granularity=ResponseGranularity.LOW)
Parse and format execution response based on specified granularity level.
Formats the final response from graph execution according to the requested granularity level, allowing clients to receive different levels of detail depending on their needs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state
¶ |
AgentState
|
The final agent state after graph execution. |
required |
messages
¶ |
list[Message]
|
List of messages generated during execution. |
required |
response_granularity
¶ |
ResponseGranularity
|
Level of detail to include in the response: - FULL: Returns complete state object and all messages - PARTIAL: Returns context, summary, and messages - LOW: Returns only the messages (default) |
LOW
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary containing the formatted response with keys depending on |
dict[str, Any]
|
granularity level. Always includes 'messages' key with execution results. |
Example
# LOW granularity (default)
response = await parse_response(state, messages)
# Returns: {"messages": [Message(...), ...]}
# FULL granularity
response = await parse_response(state, messages, ResponseGranularity.FULL)
# Returns: {"state": AgentState(...), "messages": [Message(...), ...]}
Source code in pyagenity/graph/utils/utils.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
|
process_node_result
async
¶process_node_result(result, state, messages)
Processes the result from a node execution, updating the agent state, message list, and determining the next node.
Supports: - Handling results of type Command, AgentState, Message, list, str, dict, or other types. - Deduplicating messages by message_id. - Updating the agent state and its context with new messages. - Extracting navigation information (next node) from Command results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result
¶ |
Any
|
The output from a node execution. Can be a Command, AgentState, Message, list, str, dict, ModelResponse, or other types. |
required |
state
¶ |
StateT
|
The current agent state. |
required |
messages
¶ |
list[Message]
|
The list of messages accumulated so far. |
required |
Returns:
Type | Description |
---|---|
tuple[StateT, list[Message], str | None]
|
tuple[StateT, list[Message], str | None]: - The updated agent state. - The updated list of messages (with new, unique messages added). - The identifier of the next node to execute, if specified; otherwise, None. |
Source code in pyagenity/graph/utils/utils.py
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
|
reload_state
async
¶reload_state(config, old_state, checkpointer=Inject[BaseCheckpointer])
Load existing state from checkpointer or create new state.
Attempts to fetch a realtime-synced state first, then falls back to
the persistent checkpointer. If no existing state is found, creates
a new state from the StateGraph
's prototype state and merges any
incoming messages. Supports partial state update via 'state' in input_data.
Source code in pyagenity/graph/utils/utils.py
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
|
sync_data
async
¶sync_data(state, config, messages, trim=False, checkpointer=Inject[BaseCheckpointer], context_manager=Inject[BaseContextManager])
Sync the current state and messages to the checkpointer.
Source code in pyagenity/graph/utils/utils.py
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
|