Node
Node execution and management for PyAgenity graph workflows.
This module defines the Node class, which represents executable units within a PyAgenity graph workflow. Nodes encapsulate functions or ToolNode instances that perform specific tasks, handle dependency injection, manage execution context, and support both synchronous and streaming execution modes.
Nodes are the fundamental building blocks of graph workflows, responsible for processing state, executing business logic, and producing outputs that drive the workflow forward. They integrate seamlessly with PyAgenity's dependency injection system and callback management framework.
Classes:
Name | Description |
---|---|
Node |
Represents a node in the graph workflow. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
Node
¶
Represents a node in the graph workflow.
A Node encapsulates a function or ToolNode that can be executed as part of a graph workflow. It handles dependency injection, parameter mapping, and execution context management.
The Node class supports both regular callable functions and ToolNode instances for handling tool-based operations. It automatically injects dependencies based on function signatures and provides legacy parameter support.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
Unique identifier for the node within the graph. |
func |
Union[Callable, ToolNode]
|
The function or ToolNode to execute. |
Example
def my_function(state, config): ... return {"result": "processed"} node = Node("processor", my_function) result = await node.execute(state, config)
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new Node instance with function and dependencies. |
execute |
Execute the node function with comprehensive context and callback support. |
stream |
Execute the node function with streaming output support. |
Source code in pyagenity/graph/node.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
Attributes¶
Functions¶
__init__
¶
__init__(name, func, publisher=Inject[BasePublisher])
Initialize a new Node instance with function and dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
Unique identifier for the node within the graph. This name is used for routing, logging, and referencing the node in graph configuration. |
required |
|
Union[Callable, ToolNode]
|
The function or ToolNode to execute when this node is called. Functions should accept at least 'state' and 'config' parameters. ToolNode instances handle tool-based operations and provide their own execution logic. |
required |
|
BasePublisher | None
|
Optional event publisher for execution monitoring. Injected via dependency injection if not explicitly provided. Used for publishing node execution events and status updates. |
Inject[BasePublisher]
|
Note
The function signature is automatically analyzed to determine required parameters and dependency injection points. Parameters matching injectable service names will be automatically provided by the framework during execution.
Source code in pyagenity/graph/node.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
execute
async
¶
execute(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with comprehensive context and callback support.
Executes the node's function or ToolNode with full dependency injection, callback hook execution, and error handling. This method provides the complete execution environment including state access, configuration, and injected services.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Configuration dictionary containing execution context, user settings, thread identification, and runtime parameters. |
required |
|
AgentState
|
Current AgentState providing workflow context, message history, and shared state information accessible to the node function. |
required |
|
CallbackManager
|
Callback manager for executing pre/post execution hooks. Injected via dependency injection if not explicitly provided. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
dict[str, Any] | list[Message]
|
Either a dictionary containing updated state and execution results, |
dict[str, Any] | list[Message]
|
or a list of Message objects representing the node's output. |
dict[str, Any] | list[Message]
|
The return type depends on the node function's implementation. |
Example
# Node function that returns messages
def process_data(state, config):
result = process(state.data)
return [Message.text_message(f"Processed: {result}")]
node = Node("processor", process_data)
messages = await node.execute(config, state)
Note
The node function receives dependency-injected parameters based on its signature. Common injectable parameters include 'state', 'config', 'context_manager', 'publisher', and other framework services.
Source code in pyagenity/graph/node.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
|
stream
async
¶
stream(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with streaming output support.
Similar to execute() but designed for streaming scenarios where the node function can produce incremental results. This method provides an async iterator interface over the node's outputs, allowing for real-time processing and response streaming.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Configuration dictionary with execution context and settings. |
required |
|
AgentState
|
Current AgentState providing workflow context and shared state. |
required |
|
CallbackManager
|
Callback manager for pre/post execution hook handling. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncIterable[dict[str, Any] | Message]
|
Dictionary objects or Message instances representing incremental |
AsyncIterable[dict[str, Any] | Message]
|
outputs from the node function. The exact type and frequency of |
AsyncIterable[dict[str, Any] | Message]
|
yields depends on the node function's streaming implementation. |
Example
async def streaming_processor(state, config):
for item in large_dataset:
result = process_item(item)
yield Message.text_message(f"Processed item: {result}")
node = Node("stream_processor", streaming_processor)
async for output in node.stream(config, state):
print(f"Streamed: {output.content}")
Note
Not all node functions support streaming. For non-streaming functions, this method will yield a single result equivalent to calling execute(). The streaming capability is determined by the node function's implementation.
Source code in pyagenity/graph/node.py
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|