Skip to content

Node

Node execution and management for PyAgenity graph workflows.

This module defines the Node class, which represents executable units within a PyAgenity graph workflow. Nodes encapsulate functions or ToolNode instances that perform specific tasks, handle dependency injection, manage execution context, and support both synchronous and streaming execution modes.

Nodes are the fundamental building blocks of graph workflows, responsible for processing state, executing business logic, and producing outputs that drive the workflow forward. They integrate seamlessly with PyAgenity's dependency injection system and callback management framework.

Classes:

Name Description
Node

Represents a node in the graph workflow.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

Node

Represents a node in the graph workflow.

A Node encapsulates a function or ToolNode that can be executed as part of a graph workflow. It handles dependency injection, parameter mapping, and execution context management.

The Node class supports both regular callable functions and ToolNode instances for handling tool-based operations. It automatically injects dependencies based on function signatures and provides legacy parameter support.

Attributes:

Name Type Description
name str

Unique identifier for the node within the graph.

func Union[Callable, ToolNode]

The function or ToolNode to execute.

Example

def my_function(state, config): ... return {"result": "processed"} node = Node("processor", my_function) result = await node.execute(state, config)

Methods:

Name Description
__init__

Initialize a new Node instance with function and dependencies.

execute

Execute the node function with comprehensive context and callback support.

stream

Execute the node function with streaming output support.

Source code in pyagenity/graph/node.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class Node:
    """Represents a node in the graph workflow.

    A Node encapsulates a function or ToolNode that can be executed as part of
    a graph workflow. It handles dependency injection, parameter mapping, and
    execution context management.

    The Node class supports both regular callable functions and ToolNode instances
    for handling tool-based operations. It automatically injects dependencies
    based on function signatures and provides legacy parameter support.

    Attributes:
        name (str): Unique identifier for the node within the graph.
        func (Union[Callable, ToolNode]): The function or ToolNode to execute.

    Example:
        >>> def my_function(state, config):
        ...     return {"result": "processed"}
        >>> node = Node("processor", my_function)
        >>> result = await node.execute(state, config)
    """

    def __init__(
        self,
        name: str,
        func: Union[Callable, "ToolNode"],
        publisher: BasePublisher | None = Inject[BasePublisher],
    ):
        """Initialize a new Node instance with function and dependencies.

        Args:
            name: Unique identifier for the node within the graph. This name
                is used for routing, logging, and referencing the node in
                graph configuration.
            func: The function or ToolNode to execute when this node is called.
                Functions should accept at least 'state' and 'config' parameters.
                ToolNode instances handle tool-based operations and provide
                their own execution logic.
            publisher: Optional event publisher for execution monitoring.
                Injected via dependency injection if not explicitly provided.
                Used for publishing node execution events and status updates.

        Note:
            The function signature is automatically analyzed to determine
            required parameters and dependency injection points. Parameters
            matching injectable service names will be automatically provided
            by the framework during execution.
        """
        logger.debug(
            "Initializing node '%s' with func=%s",
            name,
            getattr(func, "__name__", type(func).__name__),
        )
        self.name = name
        self.func = func
        self.publisher = publisher
        self.invoke_handler = InvokeNodeHandler(
            name,
            func,
        )

        self.stream_handler = StreamNodeHandler(
            name,
            func,
        )

    async def execute(
        self,
        config: dict[str, Any],
        state: AgentState,
        callback_mgr: CallbackManager = Inject[CallbackManager],
    ) -> dict[str, Any] | list[Message]:
        """Execute the node function with comprehensive context and callback support.

        Executes the node's function or ToolNode with full dependency injection,
        callback hook execution, and error handling. This method provides the
        complete execution environment including state access, configuration,
        and injected services.

        Args:
            config: Configuration dictionary containing execution context,
                user settings, thread identification, and runtime parameters.
            state: Current AgentState providing workflow context, message history,
                and shared state information accessible to the node function.
            callback_mgr: Callback manager for executing pre/post execution hooks.
                Injected via dependency injection if not explicitly provided.

        Returns:
            Either a dictionary containing updated state and execution results,
            or a list of Message objects representing the node's output.
            The return type depends on the node function's implementation.

        Raises:
            Various exceptions depending on node function behavior. All exceptions
            are handled by the callback manager's error handling hooks before
            being propagated.

        Example:
            ```python
            # Node function that returns messages
            def process_data(state, config):
                result = process(state.data)
                return [Message.text_message(f"Processed: {result}")]


            node = Node("processor", process_data)
            messages = await node.execute(config, state)
            ```

        Note:
            The node function receives dependency-injected parameters based on
            its signature. Common injectable parameters include 'state', 'config',
            'context_manager', 'publisher', and other framework services.
        """
        return await self.invoke_handler.invoke(
            config,
            state,
            callback_mgr,
        )

    async def stream(
        self,
        config: dict[str, Any],
        state: AgentState,
        callback_mgr: CallbackManager = Inject[CallbackManager],
    ) -> AsyncIterable[dict[str, Any] | Message]:
        """Execute the node function with streaming output support.

        Similar to execute() but designed for streaming scenarios where the node
        function can produce incremental results. This method provides an async
        iterator interface over the node's outputs, allowing for real-time
        processing and response streaming.

        Args:
            config: Configuration dictionary with execution context and settings.
            state: Current AgentState providing workflow context and shared state.
            callback_mgr: Callback manager for pre/post execution hook handling.

        Yields:
            Dictionary objects or Message instances representing incremental
            outputs from the node function. The exact type and frequency of
            yields depends on the node function's streaming implementation.

        Example:
            ```python
            async def streaming_processor(state, config):
                for item in large_dataset:
                    result = process_item(item)
                    yield Message.text_message(f"Processed item: {result}")


            node = Node("stream_processor", streaming_processor)
            async for output in node.stream(config, state):
                print(f"Streamed: {output.content}")
            ```

        Note:
            Not all node functions support streaming. For non-streaming functions,
            this method will yield a single result equivalent to calling execute().
            The streaming capability is determined by the node function's implementation.
        """
        result = self.stream_handler.stream(
            config,
            state,
            callback_mgr,
        )

        async for item in result:
            yield item

Attributes

func instance-attribute
func = func
invoke_handler instance-attribute
invoke_handler = InvokeNodeHandler(name, func)
name instance-attribute
name = name
publisher instance-attribute
publisher = publisher
stream_handler instance-attribute
stream_handler = StreamNodeHandler(name, func)

Functions

__init__
__init__(name, func, publisher=Inject[BasePublisher])

Initialize a new Node instance with function and dependencies.

Parameters:

Name Type Description Default
name
str

Unique identifier for the node within the graph. This name is used for routing, logging, and referencing the node in graph configuration.

required
func
Union[Callable, ToolNode]

The function or ToolNode to execute when this node is called. Functions should accept at least 'state' and 'config' parameters. ToolNode instances handle tool-based operations and provide their own execution logic.

required
publisher
BasePublisher | None

Optional event publisher for execution monitoring. Injected via dependency injection if not explicitly provided. Used for publishing node execution events and status updates.

Inject[BasePublisher]
Note

The function signature is automatically analyzed to determine required parameters and dependency injection points. Parameters matching injectable service names will be automatically provided by the framework during execution.

Source code in pyagenity/graph/node.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    name: str,
    func: Union[Callable, "ToolNode"],
    publisher: BasePublisher | None = Inject[BasePublisher],
):
    """Initialize a new Node instance with function and dependencies.

    Args:
        name: Unique identifier for the node within the graph. This name
            is used for routing, logging, and referencing the node in
            graph configuration.
        func: The function or ToolNode to execute when this node is called.
            Functions should accept at least 'state' and 'config' parameters.
            ToolNode instances handle tool-based operations and provide
            their own execution logic.
        publisher: Optional event publisher for execution monitoring.
            Injected via dependency injection if not explicitly provided.
            Used for publishing node execution events and status updates.

    Note:
        The function signature is automatically analyzed to determine
        required parameters and dependency injection points. Parameters
        matching injectable service names will be automatically provided
        by the framework during execution.
    """
    logger.debug(
        "Initializing node '%s' with func=%s",
        name,
        getattr(func, "__name__", type(func).__name__),
    )
    self.name = name
    self.func = func
    self.publisher = publisher
    self.invoke_handler = InvokeNodeHandler(
        name,
        func,
    )

    self.stream_handler = StreamNodeHandler(
        name,
        func,
    )
execute async
execute(config, state, callback_mgr=Inject[CallbackManager])

Execute the node function with comprehensive context and callback support.

Executes the node's function or ToolNode with full dependency injection, callback hook execution, and error handling. This method provides the complete execution environment including state access, configuration, and injected services.

Parameters:

Name Type Description Default
config
dict[str, Any]

Configuration dictionary containing execution context, user settings, thread identification, and runtime parameters.

required
state
AgentState

Current AgentState providing workflow context, message history, and shared state information accessible to the node function.

required
callback_mgr
CallbackManager

Callback manager for executing pre/post execution hooks. Injected via dependency injection if not explicitly provided.

Inject[CallbackManager]

Returns:

Type Description
dict[str, Any] | list[Message]

Either a dictionary containing updated state and execution results,

dict[str, Any] | list[Message]

or a list of Message objects representing the node's output.

dict[str, Any] | list[Message]

The return type depends on the node function's implementation.

Example
# Node function that returns messages
def process_data(state, config):
    result = process(state.data)
    return [Message.text_message(f"Processed: {result}")]


node = Node("processor", process_data)
messages = await node.execute(config, state)
Note

The node function receives dependency-injected parameters based on its signature. Common injectable parameters include 'state', 'config', 'context_manager', 'publisher', and other framework services.

Source code in pyagenity/graph/node.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
async def execute(
    self,
    config: dict[str, Any],
    state: AgentState,
    callback_mgr: CallbackManager = Inject[CallbackManager],
) -> dict[str, Any] | list[Message]:
    """Execute the node function with comprehensive context and callback support.

    Executes the node's function or ToolNode with full dependency injection,
    callback hook execution, and error handling. This method provides the
    complete execution environment including state access, configuration,
    and injected services.

    Args:
        config: Configuration dictionary containing execution context,
            user settings, thread identification, and runtime parameters.
        state: Current AgentState providing workflow context, message history,
            and shared state information accessible to the node function.
        callback_mgr: Callback manager for executing pre/post execution hooks.
            Injected via dependency injection if not explicitly provided.

    Returns:
        Either a dictionary containing updated state and execution results,
        or a list of Message objects representing the node's output.
        The return type depends on the node function's implementation.

    Raises:
        Various exceptions depending on node function behavior. All exceptions
        are handled by the callback manager's error handling hooks before
        being propagated.

    Example:
        ```python
        # Node function that returns messages
        def process_data(state, config):
            result = process(state.data)
            return [Message.text_message(f"Processed: {result}")]


        node = Node("processor", process_data)
        messages = await node.execute(config, state)
        ```

    Note:
        The node function receives dependency-injected parameters based on
        its signature. Common injectable parameters include 'state', 'config',
        'context_manager', 'publisher', and other framework services.
    """
    return await self.invoke_handler.invoke(
        config,
        state,
        callback_mgr,
    )
stream async
stream(config, state, callback_mgr=Inject[CallbackManager])

Execute the node function with streaming output support.

Similar to execute() but designed for streaming scenarios where the node function can produce incremental results. This method provides an async iterator interface over the node's outputs, allowing for real-time processing and response streaming.

Parameters:

Name Type Description Default
config
dict[str, Any]

Configuration dictionary with execution context and settings.

required
state
AgentState

Current AgentState providing workflow context and shared state.

required
callback_mgr
CallbackManager

Callback manager for pre/post execution hook handling.

Inject[CallbackManager]

Yields:

Type Description
AsyncIterable[dict[str, Any] | Message]

Dictionary objects or Message instances representing incremental

AsyncIterable[dict[str, Any] | Message]

outputs from the node function. The exact type and frequency of

AsyncIterable[dict[str, Any] | Message]

yields depends on the node function's streaming implementation.

Example
async def streaming_processor(state, config):
    for item in large_dataset:
        result = process_item(item)
        yield Message.text_message(f"Processed item: {result}")


node = Node("stream_processor", streaming_processor)
async for output in node.stream(config, state):
    print(f"Streamed: {output.content}")
Note

Not all node functions support streaming. For non-streaming functions, this method will yield a single result equivalent to calling execute(). The streaming capability is determined by the node function's implementation.

Source code in pyagenity/graph/node.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
async def stream(
    self,
    config: dict[str, Any],
    state: AgentState,
    callback_mgr: CallbackManager = Inject[CallbackManager],
) -> AsyncIterable[dict[str, Any] | Message]:
    """Execute the node function with streaming output support.

    Similar to execute() but designed for streaming scenarios where the node
    function can produce incremental results. This method provides an async
    iterator interface over the node's outputs, allowing for real-time
    processing and response streaming.

    Args:
        config: Configuration dictionary with execution context and settings.
        state: Current AgentState providing workflow context and shared state.
        callback_mgr: Callback manager for pre/post execution hook handling.

    Yields:
        Dictionary objects or Message instances representing incremental
        outputs from the node function. The exact type and frequency of
        yields depends on the node function's streaming implementation.

    Example:
        ```python
        async def streaming_processor(state, config):
            for item in large_dataset:
                result = process_item(item)
                yield Message.text_message(f"Processed item: {result}")


        node = Node("stream_processor", streaming_processor)
        async for output in node.stream(config, state):
            print(f"Streamed: {output.content}")
        ```

    Note:
        Not all node functions support streaming. For non-streaming functions,
        this method will yield a single result equivalent to calling execute().
        The streaming capability is determined by the node function's implementation.
    """
    result = self.stream_handler.stream(
        config,
        state,
        callback_mgr,
    )

    async for item in result:
        yield item