Skip to content

Stream node handler

Streaming node handler for PyAgenity graph workflows.

This module provides the StreamNodeHandler class, which manages the execution of graph nodes that support streaming output. It handles both regular function nodes and ToolNode instances, enabling incremental result processing, dependency injection, callback management, and event publishing.

StreamNodeHandler is a key component for enabling real-time, chunked, or incremental responses in agent workflows, supporting both synchronous and asynchronous execution patterns.

Classes:

Name Description
StreamNodeHandler

Handles streaming execution for graph nodes in PyAgenity workflows.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

StreamNodeHandler

Bases: BaseLoggingMixin

Handles streaming execution for graph nodes in PyAgenity workflows.

StreamNodeHandler manages the execution of nodes that can produce streaming output, including both regular function nodes and ToolNode instances. It supports dependency injection, callback management, event publishing, and incremental result processing.

Attributes:

Name Type Description
name

Unique identifier for the node within the graph.

func

The function or ToolNode to execute. Determines streaming behavior.

Example
handler = StreamNodeHandler("process", process_function)
async for chunk in handler.stream(config, state):
    print(chunk)

Methods:

Name Description
__init__

Initialize a new StreamNodeHandler instance.

stream

Execute the node function with streaming output and callback support.

Source code in pyagenity/graph/utils/stream_node_handler.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class StreamNodeHandler(BaseLoggingMixin):
    """Handles streaming execution for graph nodes in PyAgenity workflows.

    StreamNodeHandler manages the execution of nodes that can produce streaming output,
    including both regular function nodes and ToolNode instances. It supports dependency
    injection, callback management, event publishing, and incremental result processing.

    Attributes:
        name: Unique identifier for the node within the graph.
        func: The function or ToolNode to execute. Determines streaming behavior.

    Example:
        ```python
        handler = StreamNodeHandler("process", process_function)
        async for chunk in handler.stream(config, state):
            print(chunk)
        ```
    """

    def __init__(
        self,
        name: str,
        func: Union[Callable, "ToolNode"],
    ):
        """Initialize a new StreamNodeHandler instance.

        Args:
            name: Unique identifier for the node within the graph.
            func: The function or ToolNode to execute. Determines streaming behavior.
        """
        self.name = name
        self.func = func

    async def _handle_single_tool(
        self,
        tool_call: dict[str, Any],
        state: AgentState,
        config: dict[str, Any],
    ) -> AsyncIterable[Message]:
        function_name = tool_call.get("function", {}).get("name", "")
        function_args: dict = json.loads(tool_call.get("function", {}).get("arguments", "{}"))
        tool_call_id = tool_call.get("id", "")

        logger.info(
            "Node '%s' executing tool '%s' with %d arguments",
            self.name,
            function_name,
            len(function_args),
        )
        logger.debug("Tool arguments: %s", function_args)

        # Execute the tool function with injectable parameters
        tool_result_gen = self.func.stream(  # type: ignore
            function_name,  # type: ignore
            function_args,
            tool_call_id=tool_call_id,
            state=state,
            config=config,
        )
        logger.debug("Node '%s' tool execution completed successfully", self.name)

        async for result in tool_result_gen:
            if isinstance(result, Message):
                yield result

    async def _call_tools(
        self,
        last_message: Message,
        state: "AgentState",
        config: dict[str, Any],
    ) -> AsyncIterable[Message]:
        logger.debug("Node '%s' calling tools from message", self.name)
        if (
            hasattr(last_message, "tools_calls")
            and last_message.tools_calls
            and len(last_message.tools_calls) > 0
        ):
            # Execute tool calls
            for tool_call in last_message.tools_calls:
                result_gen = self._handle_single_tool(
                    tool_call,
                    state,
                    config,
                )
                async for result in result_gen:
                    if isinstance(result, Message):
                        yield result
        else:
            # No tool calls to execute, return available tools
            logger.exception("Node '%s': No tool calls to execute", self.name)
            raise NodeError("No tool calls to execute")

    def _prepare_input_data(
        self,
        state: "AgentState",
        config: dict[str, Any],
    ) -> dict:
        sig = inspect.signature(self.func)  # type: ignore Tool node won't come here
        input_data = {}
        default_data = {
            "state": state,
            "config": config,
        }

        # # Get injectable parameters to determine which ones to exclude from manual passing
        # # Prepare function arguments (excluding injectable parameters)
        for param_name, param in sig.parameters.items():
            # Skip *args/**kwargs
            if param.kind in (
                inspect.Parameter.VAR_POSITIONAL,
                inspect.Parameter.VAR_KEYWORD,
            ):
                continue

            # check its state, config
            if param_name in ["state", "config"]:
                input_data[param_name] = default_data[param_name]
            # Include regular function arguments
            elif param.default is inspect.Parameter.empty:
                raise TypeError(
                    f"Missing required parameter '{param_name}' for function '{self.func}'"
                )

        return input_data

    async def _call_normal_node(  # noqa: PLR0912, PLR0915
        self,
        state: "AgentState",
        config: dict[str, Any],
        callback_mgr: CallbackManager,
    ) -> AsyncIterable[dict[str, Any] | Message]:
        logger.debug("Node '%s' calling normal function", self.name)
        result: dict[str, Any] | Message = {}

        logger.debug("Node '%s' is a regular function, executing with callbacks", self.name)
        # This is a regular function - likely AI function
        # Create callback context for AI invocation
        context = CallbackContext(
            invocation_type=InvocationType.AI,
            node_name=self.name,
            function_name=getattr(self.func, "__name__", str(self.func)),
            metadata={"config": config},
        )

        # Execute before_invoke callbacks
        input_data = self._prepare_input_data(
            state,
            config,
        )

        last_message = state.context[-1] if state.context and len(state.context) > 0 else None

        event = EventModel.default(
            config,
            data={"state": state.model_dump()},
            event=Event.NODE_EXECUTION,
            content_type=[ContentType.STATE],
            node_name=self.name,
            extra={
                "node": self.name,
                "function_name": getattr(self.func, "__name__", str(self.func)),
                "last_message": last_message.model_dump() if last_message else None,
            },
        )
        publish_event(event)

        try:
            logger.debug("Node '%s' executing before_invoke callbacks", self.name)
            # Execute before_invoke callbacks
            input_data = await callback_mgr.execute_before_invoke(context, input_data)
            logger.debug("Node '%s' executing function", self.name)
            event.event_type = EventType.PROGRESS
            event.content = "Function execution started"
            publish_event(event)

            # Execute the actual function
            result = await call_sync_or_async(
                self.func,  # type: ignore
                **input_data,
            )
            logger.debug("Node '%s' function execution completed", self.name)

            logger.debug("Node '%s' executing after_invoke callbacks", self.name)
            # Execute after_invoke callbacks
            result = await callback_mgr.execute_after_invoke(context, input_data, result)

            # Now lets convert the response here only, upstream will be easy to handle
            ##############################################################################
            ################### Logics for streaming ##########################
            ##############################################################################
            """
            Check user sending command or not
            if command then we will check its streaming or not
            if streaming then we will yield from converter stream
            if not streaming then we will convert it and yield end event
            if its not command then we will check its streaming or not
            if streaming then we will yield from converter stream
            if not streaming then we will convert it and yield end event
            """
            # first check its sync and not streaming
            next_node = None
            final_result = result
            # if type of command then we will update it
            if isinstance(result, Command):
                # now check the updated
                if result.update:
                    final_result = result.update

                if result.state:
                    state = result.state
                    for msg in state.context:
                        yield msg

                next_node = result.goto

            messages = []
            if check_non_streaming(final_result):
                new_state, messages, next_node = await process_node_result(
                    final_result,
                    state,
                    messages,
                )
                event.data["state"] = new_state.model_dump()
                event.event_type = EventType.END
                event.data["messages"] = [m.model_dump() for m in messages] if messages else []
                event.data["next_node"] = next_node
                publish_event(event)
                for m in messages:
                    yield m

                yield {
                    "is_non_streaming": True,
                    "state": new_state,
                    "messages": messages,
                    "next_node": next_node,
                }
                return  # done

            # If the result is a ConverterCall with stream=True, use the converter
            if isinstance(result, ModelResponseConverter) and result.response:
                stream_gen = result.stream(
                    config,
                    node_name=self.name,
                    meta={
                        "function_name": getattr(self.func, "__name__", str(self.func)),
                    },
                )
                # this will return event_model or message
                async for item in stream_gen:
                    if isinstance(item, Message) and not item.delta:
                        messages.append(item)
                    yield item
            # Things are done, so publish event and yield final response
            event.event_type = EventType.END
            if messages:
                final_msg = messages[-1]
                event.data["message"] = final_msg.model_dump()
                # Populate simple content and structured blocks when available
                event.content = (
                    final_msg.text() if isinstance(final_msg.content, list) else final_msg.content
                )
                if isinstance(final_msg.content, list):
                    event.content_blocks = final_msg.content
            else:
                event.data["message"] = None
                event.content = ""
                event.content_blocks = None
            event.content_type = [ContentType.MESSAGE, ContentType.STATE]
            publish_event(event)
            # if user use command and its streaming in that case we need to handle next node also
            yield {
                "is_non_streaming": False,
                "messages": messages,
                "next_node": next_node,
            }

        except Exception as e:
            logger.warning(
                "Node '%s' execution failed, executing error callbacks: %s", self.name, e
            )
            # Execute error callbacks
            recovery_result = await callback_mgr.execute_on_error(context, input_data, e)

            if isinstance(recovery_result, Message):
                logger.info(
                    "Node '%s' recovered from error using callback result",
                    self.name,
                )
                # Use recovery result instead of raising the error
                event.event_type = EventType.END
                event.content = "Function execution recovered from error"
                event.data["message"] = recovery_result.model_dump()
                event.content_type = [ContentType.MESSAGE, ContentType.STATE]
                publish_event(event)

                yield recovery_result
            else:
                # Re-raise the original error
                logger.error("Node '%s' could not recover from error", self.name)
                event.event_type = EventType.ERROR
                event.content = f"Function execution failed: {e}"
                event.data["error"] = str(e)
                event.content_type = [ContentType.ERROR, ContentType.STATE]
                publish_event(event)
                raise

    async def stream(
        self,
        config: dict[str, Any],
        state: AgentState,
        callback_mgr: CallbackManager = Inject[CallbackManager],
    ) -> AsyncGenerator[dict[str, Any] | Message]:
        """Execute the node function with streaming output and callback support.

        Handles both ToolNode and regular function nodes, yielding incremental results
        as they become available. Supports dependency injection, callback management,
        and event publishing for monitoring and debugging.

        Args:
            config: Configuration dictionary containing execution context and settings.
            state: Current AgentState providing workflow context and shared state.
            callback_mgr: Callback manager for pre/post execution hook handling.

        Yields:
            Dictionary objects or Message instances representing incremental outputs
            from the node function. The exact type and frequency of yields depends on
            the node function's streaming implementation.

        Raises:
            NodeError: If node execution fails or encounters an error.

        Example:
            ```python
            async for chunk in handler.stream(config, state):
                print(chunk)
            ```
        """
        logger.info("Executing node '%s'", self.name)
        logger.debug(
            "Node '%s' execution with state context size=%d, config keys=%s",
            self.name,
            len(state.context) if state.context else 0,
            list(config.keys()) if config else [],
        )

        # In this function publishing events not required
        # If its tool node, its already handled there, from start to end
        # In this class we need to handle normal function calls only
        # We will yield events from here only for normal function calls
        # ToolNode will yield events from its own stream method

        try:
            if isinstance(self.func, ToolNode):
                logger.debug("Node '%s' is a ToolNode, executing tool calls", self.name)
                # This is tool execution - handled separately in ToolNode
                if state.context and len(state.context) > 0:
                    last_message = state.context[-1]
                    logger.debug("Node '%s' processing tool calls from last message", self.name)
                    result = self._call_tools(
                        last_message,
                        state,
                        config,
                    )
                    async for item in result:
                        yield item
                    # Check if last message has tool calls to execute
                else:
                    # No context, return available tools
                    error_msg = "No context available for tool execution"
                    logger.error("Node '%s': %s", self.name, error_msg)
                    raise NodeError(error_msg)

            else:
                result = self._call_normal_node(
                    state,
                    config,
                    callback_mgr,
                )
                async for item in result:
                    yield item

            logger.info("Node '%s' execution completed successfully", self.name)
        except Exception as e:
            # This is the final catch-all for node execution errors
            logger.exception("Node '%s' execution failed: %s", self.name, e)
            raise NodeError(f"Error in node '{self.name}': {e!s}") from e

Attributes

func instance-attribute
func = func
name instance-attribute
name = name

Functions

__init__
__init__(name, func)

Initialize a new StreamNodeHandler instance.

Parameters:

Name Type Description Default
name
str

Unique identifier for the node within the graph.

required
func
Union[Callable, ToolNode]

The function or ToolNode to execute. Determines streaming behavior.

required
Source code in pyagenity/graph/utils/stream_node_handler.py
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    name: str,
    func: Union[Callable, "ToolNode"],
):
    """Initialize a new StreamNodeHandler instance.

    Args:
        name: Unique identifier for the node within the graph.
        func: The function or ToolNode to execute. Determines streaming behavior.
    """
    self.name = name
    self.func = func
stream async
stream(config, state, callback_mgr=Inject[CallbackManager])

Execute the node function with streaming output and callback support.

Handles both ToolNode and regular function nodes, yielding incremental results as they become available. Supports dependency injection, callback management, and event publishing for monitoring and debugging.

Parameters:

Name Type Description Default
config
dict[str, Any]

Configuration dictionary containing execution context and settings.

required
state
AgentState

Current AgentState providing workflow context and shared state.

required
callback_mgr
CallbackManager

Callback manager for pre/post execution hook handling.

Inject[CallbackManager]

Yields:

Type Description
AsyncGenerator[dict[str, Any] | Message]

Dictionary objects or Message instances representing incremental outputs

AsyncGenerator[dict[str, Any] | Message]

from the node function. The exact type and frequency of yields depends on

AsyncGenerator[dict[str, Any] | Message]

the node function's streaming implementation.

Raises:

Type Description
NodeError

If node execution fails or encounters an error.

Example
async for chunk in handler.stream(config, state):
    print(chunk)
Source code in pyagenity/graph/utils/stream_node_handler.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
async def stream(
    self,
    config: dict[str, Any],
    state: AgentState,
    callback_mgr: CallbackManager = Inject[CallbackManager],
) -> AsyncGenerator[dict[str, Any] | Message]:
    """Execute the node function with streaming output and callback support.

    Handles both ToolNode and regular function nodes, yielding incremental results
    as they become available. Supports dependency injection, callback management,
    and event publishing for monitoring and debugging.

    Args:
        config: Configuration dictionary containing execution context and settings.
        state: Current AgentState providing workflow context and shared state.
        callback_mgr: Callback manager for pre/post execution hook handling.

    Yields:
        Dictionary objects or Message instances representing incremental outputs
        from the node function. The exact type and frequency of yields depends on
        the node function's streaming implementation.

    Raises:
        NodeError: If node execution fails or encounters an error.

    Example:
        ```python
        async for chunk in handler.stream(config, state):
            print(chunk)
        ```
    """
    logger.info("Executing node '%s'", self.name)
    logger.debug(
        "Node '%s' execution with state context size=%d, config keys=%s",
        self.name,
        len(state.context) if state.context else 0,
        list(config.keys()) if config else [],
    )

    # In this function publishing events not required
    # If its tool node, its already handled there, from start to end
    # In this class we need to handle normal function calls only
    # We will yield events from here only for normal function calls
    # ToolNode will yield events from its own stream method

    try:
        if isinstance(self.func, ToolNode):
            logger.debug("Node '%s' is a ToolNode, executing tool calls", self.name)
            # This is tool execution - handled separately in ToolNode
            if state.context and len(state.context) > 0:
                last_message = state.context[-1]
                logger.debug("Node '%s' processing tool calls from last message", self.name)
                result = self._call_tools(
                    last_message,
                    state,
                    config,
                )
                async for item in result:
                    yield item
                # Check if last message has tool calls to execute
            else:
                # No context, return available tools
                error_msg = "No context available for tool execution"
                logger.error("Node '%s': %s", self.name, error_msg)
                raise NodeError(error_msg)

        else:
            result = self._call_normal_node(
                state,
                config,
                callback_mgr,
            )
            async for item in result:
                yield item

        logger.info("Node '%s' execution completed successfully", self.name)
    except Exception as e:
        # This is the final catch-all for node execution errors
        logger.exception("Node '%s' execution failed: %s", self.name, e)
        raise NodeError(f"Error in node '{self.name}': {e!s}") from e

Functions