Utils
Modules:
Name | Description |
---|---|
handler_mixins |
Shared mixins for graph and node handler classes. |
invoke_handler |
|
invoke_node_handler |
InvokeNodeHandler utilities for PyAgenity agent graph execution. |
stream_handler |
Streaming graph execution handler for PyAgenity workflows. |
stream_node_handler |
Streaming node handler for PyAgenity graph workflows. |
stream_utils |
Streaming utility functions for PyAgenity graph workflows. |
utils |
Core utility functions for graph execution and state management. |
Modules¶
handler_mixins
¶
Shared mixins for graph and node handler classes.
This module provides lightweight mixins that add common functionality to handler classes without changing their core runtime behavior. The mixins follow the composition pattern to keep responsibilities explicit and allow handlers to inherit only the capabilities they need.
The mixins provide structured logging, configuration management, and other cross-cutting concerns that are commonly needed across different handler types. By using mixins, the core handler logic remains focused while gaining these shared capabilities.
Typical usage
class MyHandler(BaseLoggingMixin, InterruptConfigMixin):
def __init__(self):
self._set_interrupts(["node1"], ["node2"])
self._log_start("Handler initialized")
Classes:
Name | Description |
---|---|
BaseLoggingMixin |
Provides structured logging helpers for handler classes. |
InterruptConfigMixin |
Manages interrupt configuration for graph-level execution handlers. |
Classes¶
BaseLoggingMixin
¶
Provides structured logging helpers for handler classes.
This mixin adds consistent logging capabilities to handler classes without requiring them to manage logger instances directly. It automatically creates loggers based on the module name and provides convenience methods for common logging operations.
The mixin is designed to be lightweight and non-intrusive, adding only logging functionality without affecting the core behavior of the handler.
Attributes:
Name | Type | Description |
---|---|---|
_logger |
Logger
|
Cached logger instance for the handler class. |
Example
class MyHandler(BaseLoggingMixin):
def process(self):
self._log_start("Processing started")
try:
# Do work
self._log_debug("Work completed successfully")
except Exception as e:
self._log_error("Processing failed: %s", e)
Source code in pyagenity/graph/utils/handler_mixins.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
|
InterruptConfigMixin
¶
Manages interrupt configuration for graph-level execution handlers.
This mixin provides functionality to store and manage interrupt points configuration for graph execution. Interrupts allow graph execution to be paused before or after specific nodes for debugging, human intervention, or checkpoint creation.
The mixin maintains separate lists for "before" and "after" interrupts, allowing fine-grained control over when graph execution should pause.
Attributes:
Name | Type | Description |
---|---|---|
interrupt_before |
list[str] | None
|
List of node names where execution should pause before node execution begins. |
interrupt_after |
list[str] | None
|
List of node names where execution should pause after node execution completes. |
Example
class GraphHandler(InterruptConfigMixin):
def __init__(self):
self._set_interrupts(
interrupt_before=["approval_node"], interrupt_after=["data_processing"]
)
Source code in pyagenity/graph/utils/handler_mixins.py
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
|
invoke_handler
¶
Classes:
Name | Description |
---|---|
InvokeHandler |
|
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
InvokeHandler
¶
Bases: BaseLoggingMixin
, InterruptConfigMixin
Methods:
Name | Description |
---|---|
__init__ |
|
invoke |
Execute the graph asynchronously with event publishing. |
Attributes:
Name | Type | Description |
---|---|---|
edges |
list[Edge]
|
|
interrupt_after |
|
|
interrupt_before |
|
|
nodes |
dict[str, Node]
|
|
Source code in pyagenity/graph/utils/invoke_handler.py
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
|
Attributes¶
Functions¶
__init__
¶__init__(nodes, edges, interrupt_before=None, interrupt_after=None)
Source code in pyagenity/graph/utils/invoke_handler.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
invoke
async
¶invoke(input_data, config, default_state, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously with event publishing.
Source code in pyagenity/graph/utils/invoke_handler.py
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
|
Functions¶
invoke_node_handler
¶
InvokeNodeHandler utilities for PyAgenity agent graph execution.
This module provides the InvokeNodeHandler class, which manages the invocation of node functions and tool nodes within the agent graph. It supports dependency injection, callback hooks, event publishing, and error recovery for both regular and tool-based nodes.
Classes:
Name | Description |
---|---|
InvokeNodeHandler |
Handles execution of node functions and tool nodes with DI and callbacks. |
Usage
handler = InvokeNodeHandler(name, func, publisher) result = await handler.invoke(config, state)
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
InvokeNodeHandler
¶
Bases: BaseLoggingMixin
Handles invocation of node functions and tool nodes in the agent graph.
Supports dependency injection, callback hooks, event publishing, and error recovery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
Name of the node. |
required |
|
Callable | ToolNode
|
The function or ToolNode to execute. |
required |
|
BasePublisher
|
Event publisher for execution events. |
Inject[BasePublisher]
|
Methods:
Name | Description |
---|---|
__init__ |
|
clear_signature_cache |
Clear the function signature cache. Useful for testing or memory management. |
invoke |
Execute the node function or ToolNode with dependency injection and callback hooks. |
Attributes:
Name | Type | Description |
---|---|---|
func |
|
|
name |
|
|
publisher |
|
Source code in pyagenity/graph/utils/invoke_node_handler.py
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
|
Attributes¶
Functions¶
__init__
¶__init__(name, func, publisher=Inject[BasePublisher])
Source code in pyagenity/graph/utils/invoke_node_handler.py
65 66 67 68 69 70 71 72 73 |
|
clear_signature_cache
classmethod
¶clear_signature_cache()
Clear the function signature cache. Useful for testing or memory management.
Source code in pyagenity/graph/utils/invoke_node_handler.py
60 61 62 63 |
|
invoke
async
¶invoke(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function or ToolNode with dependency injection and callback hooks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict
|
Node configuration. |
required |
state
¶ |
AgentState
|
Current agent state. |
required |
callback_mgr
¶ |
CallbackManager
|
Callback manager for hooks. |
Inject[CallbackManager]
|
Returns:
Type | Description |
---|---|
dict[str, Any] | list[Message]
|
dict | list[Message]: Result of node execution (regular node or tool node). |
Raises:
Type | Description |
---|---|
NodeError
|
If execution fails or context is missing for tool nodes. |
Source code in pyagenity/graph/utils/invoke_node_handler.py
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
|
Functions¶
stream_handler
¶
Streaming graph execution handler for PyAgenity workflows.
This module provides the StreamHandler class, which manages the execution of graph workflows with support for streaming output, interrupts, state persistence, and event publishing. It enables incremental result processing, pause/resume capabilities, and robust error handling for agent workflows that require real-time or chunked responses.
Classes:
Name | Description |
---|---|
StreamHandler |
Handles streaming execution for graph workflows in PyAgenity. |
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
StreamHandler
¶
Bases: BaseLoggingMixin
, InterruptConfigMixin
Handles streaming execution for graph workflows in PyAgenity.
StreamHandler manages the execution of agent workflows as directed graphs, supporting streaming output, pause/resume via interrupts, state persistence, and event publishing for monitoring and debugging. It enables incremental result processing and robust error handling for complex agent workflows.
Attributes:
Name | Type | Description |
---|---|---|
nodes |
dict[str, Node]
|
Dictionary mapping node names to Node instances. |
edges |
list[Edge]
|
List of Edge instances defining graph connections and routing. |
interrupt_before |
List of node names where execution should pause before execution. |
|
interrupt_after |
List of node names where execution should pause after execution. |
Example
handler = StreamHandler(nodes, edges)
async for chunk in handler.stream(input_data, config, state):
print(chunk)
Methods:
Name | Description |
---|---|
__init__ |
|
stream |
Execute the graph asynchronously with streaming output. |
Source code in pyagenity/graph/utils/stream_handler.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
|
Attributes¶
Functions¶
__init__
¶__init__(nodes, edges, interrupt_before=None, interrupt_after=None)
Source code in pyagenity/graph/utils/stream_handler.py
71 72 73 74 75 76 77 78 79 80 81 82 83 |
|
stream
async
¶stream(input_data, config, default_state, response_granularity=ResponseGranularity.LOW)
Execute the graph asynchronously with streaming output.
Runs the graph workflow from start to finish, yielding incremental results as they become available. Automatically detects whether to start a fresh execution or resume from an interrupted state, supporting pause/resume and checkpointing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
¶ |
dict[str, Any]
|
Input dictionary for graph execution. For new executions, should contain 'messages' key with initial messages. For resumed executions, can contain additional data to merge. |
required |
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution settings and context. |
required |
default_state
¶ |
StateT
|
Initial or template AgentState for workflow execution. |
required |
response_granularity
¶ |
ResponseGranularity
|
Level of detail in the response (LOW, PARTIAL, FULL). |
LOW
|
Yields:
Type | Description |
---|---|
AsyncGenerator[Message]
|
Message objects representing incremental results from graph execution. |
AsyncGenerator[Message]
|
The exact type and frequency of yields depends on node implementations |
AsyncGenerator[Message]
|
and workflow configuration. |
Raises:
Type | Description |
---|---|
GraphRecursionError
|
If execution exceeds recursion limit. |
ValueError
|
If input_data is invalid for new execution. |
Various exceptions
|
Depending on node execution failures. |
Example
async for chunk in handler.stream(input_data, config, state):
print(chunk)
Source code in pyagenity/graph/utils/stream_handler.py
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
|
Functions¶
stream_node_handler
¶
Streaming node handler for PyAgenity graph workflows.
This module provides the StreamNodeHandler class, which manages the execution of graph nodes that support streaming output. It handles both regular function nodes and ToolNode instances, enabling incremental result processing, dependency injection, callback management, and event publishing.
StreamNodeHandler is a key component for enabling real-time, chunked, or incremental responses in agent workflows, supporting both synchronous and asynchronous execution patterns.
Classes:
Name | Description |
---|---|
StreamNodeHandler |
Handles streaming execution for graph nodes in PyAgenity workflows. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
StreamNodeHandler
¶
Bases: BaseLoggingMixin
Handles streaming execution for graph nodes in PyAgenity workflows.
StreamNodeHandler manages the execution of nodes that can produce streaming output, including both regular function nodes and ToolNode instances. It supports dependency injection, callback management, event publishing, and incremental result processing.
Attributes:
Name | Type | Description |
---|---|---|
name |
Unique identifier for the node within the graph. |
|
func |
The function or ToolNode to execute. Determines streaming behavior. |
Example
handler = StreamNodeHandler("process", process_function)
async for chunk in handler.stream(config, state):
print(chunk)
Methods:
Name | Description |
---|---|
__init__ |
Initialize a new StreamNodeHandler instance. |
stream |
Execute the node function with streaming output and callback support. |
Source code in pyagenity/graph/utils/stream_node_handler.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
|
Attributes¶
Functions¶
__init__
¶__init__(name, func)
Initialize a new StreamNodeHandler instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
¶ |
str
|
Unique identifier for the node within the graph. |
required |
func
¶ |
Union[Callable, ToolNode]
|
The function or ToolNode to execute. Determines streaming behavior. |
required |
Source code in pyagenity/graph/utils/stream_node_handler.py
62 63 64 65 66 67 68 69 70 71 72 73 74 |
|
stream
async
¶stream(config, state, callback_mgr=Inject[CallbackManager])
Execute the node function with streaming output and callback support.
Handles both ToolNode and regular function nodes, yielding incremental results as they become available. Supports dependency injection, callback management, and event publishing for monitoring and debugging.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any]
|
Configuration dictionary containing execution context and settings. |
required |
state
¶ |
AgentState
|
Current AgentState providing workflow context and shared state. |
required |
callback_mgr
¶ |
CallbackManager
|
Callback manager for pre/post execution hook handling. |
Inject[CallbackManager]
|
Yields:
Type | Description |
---|---|
AsyncGenerator[dict[str, Any] | Message]
|
Dictionary objects or Message instances representing incremental outputs |
AsyncGenerator[dict[str, Any] | Message]
|
from the node function. The exact type and frequency of yields depends on |
AsyncGenerator[dict[str, Any] | Message]
|
the node function's streaming implementation. |
Raises:
Type | Description |
---|---|
NodeError
|
If node execution fails or encounters an error. |
Example
async for chunk in handler.stream(config, state):
print(chunk)
Source code in pyagenity/graph/utils/stream_node_handler.py
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
|
Functions¶
stream_utils
¶
Streaming utility functions for PyAgenity graph workflows.
This module provides helper functions for determining whether a result from a node or tool execution should be treated as non-streaming (i.e., a complete result) or processed incrementally as a stream. These utilities are used throughout the graph execution engine to support both synchronous and streaming workflows.
Functions:
Name | Description |
---|---|
check_non_streaming |
Determine if a result should be treated as non-streaming. |
Classes¶
Functions¶
check_non_streaming
¶
check_non_streaming(result)
Determine if a result should be treated as non-streaming.
Checks whether the given result is a complete, non-streaming output (such as a list, dict, string, Message, or AgentState) or if it should be processed incrementally as a stream.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
The result object returned from a node or tool execution. Can be any type. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if the result is non-streaming and should be processed as a complete output; |
bool
|
False if the result should be handled as a stream. |
Example
check_non_streaming([Message.text_message("done")]) True check_non_streaming(Message.text_message("done")) True check_non_streaming({"choices": [...]}) True check_non_streaming("some text") True
Source code in pyagenity/graph/utils/stream_utils.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
|
utils
¶
Core utility functions for graph execution and state management.
This module provides essential utilities for PyAgenity graph execution, including state management, message processing, response formatting, and execution flow control. These functions handle the low-level operations that support graph workflow execution.
The utilities in this module are designed to work with PyAgenity's dependency injection system and provide consistent interfaces for common operations across different execution contexts.
Key functionality areas: - State loading, creation, and synchronization - Message processing and deduplication - Response formatting based on granularity levels - Node execution result processing - Interrupt handling and execution flow control
Functions:
Name | Description |
---|---|
call_realtime_sync |
Call the realtime state sync hook if provided. |
check_and_handle_interrupt |
Check for interrupts and save state if needed. Returns True if interrupted. |
get_next_node |
Get the next node to execute based on edges. |
load_or_create_state |
Load existing state from checkpointer or create new state. |
parse_response |
Parse and format execution response based on specified granularity level. |
process_node_result |
Processes the result from a node execution, updating the agent state, message list, |
reload_state |
Load existing state from checkpointer or create new state. |
sync_data |
Sync the current state and messages to the checkpointer. |
Attributes:
Name | Type | Description |
---|---|---|
StateT |
|
|
logger |
|
Attributes¶
Classes¶
Functions¶
call_realtime_sync
async
¶
call_realtime_sync(state, config, checkpointer=Inject[BaseCheckpointer])
Call the realtime state sync hook if provided.
Source code in pyagenity/graph/utils/utils.py
460 461 462 463 464 465 466 467 468 469 |
|
check_and_handle_interrupt
async
¶
check_and_handle_interrupt(interrupt_before, interrupt_after, current_node, interrupt_type, state, config, _sync_data)
Check for interrupts and save state if needed. Returns True if interrupted.
Source code in pyagenity/graph/utils/utils.py
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
|
get_next_node
¶
get_next_node(current_node, state, edges)
Get the next node to execute based on edges.
Source code in pyagenity/graph/utils/utils.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
|
load_or_create_state
async
¶
load_or_create_state(input_data, config, old_state, checkpointer=Inject[BaseCheckpointer])
Load existing state from checkpointer or create new state.
Attempts to fetch a realtime-synced state first, then falls back to
the persistent checkpointer. If no existing state is found, creates
a new state from the StateGraph
's prototype state and merges any
incoming messages. Supports partial state update via 'state' in input_data.
Source code in pyagenity/graph/utils/utils.py
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
|
parse_response
async
¶
parse_response(state, messages, response_granularity=ResponseGranularity.LOW)
Parse and format execution response based on specified granularity level.
Formats the final response from graph execution according to the requested granularity level, allowing clients to receive different levels of detail depending on their needs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
AgentState
|
The final agent state after graph execution. |
required |
|
list[Message]
|
List of messages generated during execution. |
required |
|
ResponseGranularity
|
Level of detail to include in the response: - FULL: Returns complete state object and all messages - PARTIAL: Returns context, summary, and messages - LOW: Returns only the messages (default) |
LOW
|
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary containing the formatted response with keys depending on |
dict[str, Any]
|
granularity level. Always includes 'messages' key with execution results. |
Example
# LOW granularity (default)
response = await parse_response(state, messages)
# Returns: {"messages": [Message(...), ...]}
# FULL granularity
response = await parse_response(state, messages, ResponseGranularity.FULL)
# Returns: {"state": AgentState(...), "messages": [Message(...), ...]}
Source code in pyagenity/graph/utils/utils.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
|
process_node_result
async
¶
process_node_result(result, state, messages)
Processes the result from a node execution, updating the agent state, message list, and determining the next node.
Supports: - Handling results of type Command, AgentState, Message, list, str, dict, or other types. - Deduplicating messages by message_id. - Updating the agent state and its context with new messages. - Extracting navigation information (next node) from Command results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
Any
|
The output from a node execution. Can be a Command, AgentState, Message, list, str, dict, ModelResponse, or other types. |
required |
|
StateT
|
The current agent state. |
required |
|
list[Message]
|
The list of messages accumulated so far. |
required |
Returns:
Type | Description |
---|---|
tuple[StateT, list[Message], str | None]
|
tuple[StateT, list[Message], str | None]: - The updated agent state. - The updated list of messages (with new, unique messages added). - The identifier of the next node to execute, if specified; otherwise, None. |
Source code in pyagenity/graph/utils/utils.py
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
|
reload_state
async
¶
reload_state(config, old_state, checkpointer=Inject[BaseCheckpointer])
Load existing state from checkpointer or create new state.
Attempts to fetch a realtime-synced state first, then falls back to
the persistent checkpointer. If no existing state is found, creates
a new state from the StateGraph
's prototype state and merges any
incoming messages. Supports partial state update via 'state' in input_data.
Source code in pyagenity/graph/utils/utils.py
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
|
sync_data
async
¶
sync_data(state, config, messages, trim=False, checkpointer=Inject[BaseCheckpointer], context_manager=Inject[BaseContextManager])
Sync the current state and messages to the checkpointer.
Source code in pyagenity/graph/utils/utils.py
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
|