Skip to content

State graph

Classes:

Name Description
StateGraph

Main graph class for orchestrating multi-agent workflows.

Attributes:

Name Type Description
StateT
logger

Attributes

StateT module-attribute

StateT = TypeVar('StateT', bound=AgentState)

logger module-attribute

logger = getLogger(__name__)

Classes

StateGraph

Main graph class for orchestrating multi-agent workflows.

This class provides the core functionality for building and managing stateful agent workflows. It is similar to LangGraph's StateGraph integration with support for dependency injection.

The graph is generic over state types to support custom AgentState subclasses, allowing for type-safe state management throughout the workflow execution.

Attributes:

Name Type Description
state StateT

The current state of the graph workflow.

nodes dict[str, Node]

Collection of nodes in the graph.

edges list[Edge]

Collection of edges connecting nodes.

entry_point str | None

Name of the starting node for execution.

context_manager BaseContextManager[StateT] | None

Optional context manager for handling cross-node state operations.

dependency_container DependencyContainer

Container for managing dependencies that can be injected into node functions.

compiled bool

Whether the graph has been compiled for execution.

Example

graph = StateGraph() graph.add_node("process", process_function) graph.add_edge(START, "process") graph.add_edge("process", END) compiled = graph.compile() result = compiled.invoke({"input": "data"})

Methods:

Name Description
__init__

Initialize a new StateGraph instance.

add_conditional_edges

Add conditional routing between nodes based on runtime evaluation.

add_edge

Add a static edge between two nodes.

add_node

Add a node to the graph.

compile

Compile the graph for execution.

set_entry_point

Set the entry point for the graph.

Source code in pyagenity/graph/state_graph.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class StateGraph[StateT: AgentState]:
    """Main graph class for orchestrating multi-agent workflows.

    This class provides the core functionality for building and managing stateful
    agent workflows. It is similar to LangGraph's StateGraph
    integration with support for dependency injection.

    The graph is generic over state types to support custom AgentState subclasses,
    allowing for type-safe state management throughout the workflow execution.

    Attributes:
        state (StateT): The current state of the graph workflow.
        nodes (dict[str, Node]): Collection of nodes in the graph.
        edges (list[Edge]): Collection of edges connecting nodes.
        entry_point (str | None): Name of the starting node for execution.
        context_manager (BaseContextManager[StateT] | None): Optional context manager
            for handling cross-node state operations.
        dependency_container (DependencyContainer): Container for managing
            dependencies that can be injected into node functions.
        compiled (bool): Whether the graph has been compiled for execution.

    Example:
        >>> graph = StateGraph()
        >>> graph.add_node("process", process_function)
        >>> graph.add_edge(START, "process")
        >>> graph.add_edge("process", END)
        >>> compiled = graph.compile()
        >>> result = compiled.invoke({"input": "data"})
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
        thread_name_generator: Callable[[], str] | None = None,
    ):
        """Initialize a new StateGraph instance.

        Args:
            state: Initial state for the graph. If None, a default AgentState
                will be created.
            context_manager: Optional context manager for handling cross-node
                state operations and advanced state management patterns.
            dependency_container: Container for managing dependencies that can
                be injected into node functions. If None, a new empty container
                will be created.
            publisher: Publisher for emitting events during execution

        Note:
            START and END nodes are automatically added to the graph upon
            initialization and accept the full node signature including
            dependencies.

        Example:
            # Basic usage with default AgentState
            >>> graph = StateGraph()

            # With custom state
            >>> custom_state = MyCustomState()
            >>> graph = StateGraph(custom_state)

            # Or using type hints for clarity
            >>> graph = StateGraph[MyCustomState](MyCustomState())
        """
        logger.info("Initializing StateGraph")
        logger.debug(
            "StateGraph init with state=%s, context_manager=%s",
            type(state).__name__ if state else "default AgentState",
            type(context_manager).__name__ if context_manager else None,
        )

        # State handling
        self._state: StateT = state if state else AgentState()  # type: ignore[assignment]

        # Graph structure
        self.nodes: dict[str, Node] = {}
        self.edges: list[Edge] = []
        self.entry_point: str | None = None

        # Services
        self._publisher: BasePublisher | None = publisher
        self._id_generator: BaseIDGenerator = id_generator
        self._context_manager: BaseContextManager[StateT] | None = context_manager
        self.thread_name_generator = thread_name_generator
        # save container for dependency injection
        # if any container is passed then we will activate that
        # otherwise we can skip it and use the default one
        if container is None:
            self._container = InjectQ.get_instance()
            logger.debug("No container provided, using global singleton instance")
        else:
            logger.debug("Using provided dependency container instance")
            self._container = container
            self._container.activate()

        # Register task_manager, for async tasks
        # This will be used to run background tasks
        self._task_manager = BackgroundTaskManager()

        # now setup the graph
        self._setup()

        # Add START and END nodes (accept full node signature including dependencies)
        logger.debug("Adding default START and END nodes")
        self.nodes[START] = Node(START, lambda state, config, **deps: state, self._publisher)  # type: ignore
        self.nodes[END] = Node(END, lambda state, config, **deps: state, self._publisher)
        logger.debug("StateGraph initialized with %d nodes", len(self.nodes))

    def _setup(self):
        """Setup the graph before compilation.

        This method can be used to perform any necessary setup or validation
        before compiling the graph for execution.
        """
        logger.info("Setting up StateGraph before compilation")
        # Placeholder for any setup logic needed before compilation
        # register dependencies

        # register state and context manager as singletons (these are nullable)
        self._container.bind_instance(
            BaseContextManager,
            self._context_manager,
            allow_none=True,
            allow_concrete=True,
        )
        self._container.bind_instance(
            BasePublisher,
            self._publisher,
            allow_none=True,
            allow_concrete=True,
        )

        # register id generator as factory
        self._container.bind_instance(
            BaseIDGenerator,
            self._id_generator,
            allow_concrete=True,
        )
        self._container.bind("generated_id_type", self._id_generator.id_type)
        # Allow async method also
        self._container.bind_factory(
            "generated_id",
            lambda: self._id_generator.generate(),
        )

        # Attach Thread name generator if provided
        if self.thread_name_generator is None:
            self.thread_name_generator = generate_dummy_thread_name

        generator = self.thread_name_generator or generate_dummy_thread_name

        self._container.bind_factory(
            "generated_thread_name",
            lambda: generator(),
        )

        # Save BackgroundTaskManager
        self._container.bind_instance(
            BackgroundTaskManager,
            self._task_manager,
            allow_concrete=False,
        )

    def add_node(
        self,
        name_or_func: str | Callable,
        func: Union[Callable, "ToolNode", None] = None,
    ) -> "StateGraph":
        """Add a node to the graph.

        This method supports two calling patterns:
        1. Pass a callable as the first argument (name inferred from function name)
        2. Pass a name string and callable/ToolNode as separate arguments

        Args:
            name_or_func: Either the node name (str) or a callable function.
                If callable, the function name will be used as the node name.
            func: The function or ToolNode to execute. Required if name_or_func
                is a string, ignored if name_or_func is callable.

        Returns:
            StateGraph: The graph instance for method chaining.

        Raises:
            ValueError: If invalid arguments are provided.

        Example:
            >>> # Method 1: Function name inferred
            >>> graph.add_node(my_function)
            >>> # Method 2: Explicit name and function
            >>> graph.add_node("process", my_function)
        """
        if callable(name_or_func) and func is None:
            # Function passed as first argument
            name = name_or_func.__name__
            func = name_or_func
            logger.debug("Adding node '%s' with inferred name from function", name)
        elif isinstance(name_or_func, str) and (callable(func) or isinstance(func, ToolNode)):
            # Name and function passed separately
            name = name_or_func
            logger.debug(
                "Adding node '%s' with explicit name and %s",
                name,
                "ToolNode" if isinstance(func, ToolNode) else "callable",
            )
        else:
            error_msg = "Invalid arguments for add_node"
            logger.error(error_msg)
            raise ValueError(error_msg)

        self.nodes[name] = Node(name, func)
        logger.info("Added node '%s' to graph (total nodes: %d)", name, len(self.nodes))
        return self

    def add_edge(
        self,
        from_node: str,
        to_node: str,
    ) -> "StateGraph":
        """Add a static edge between two nodes.

        Creates a direct connection from one node to another. If the source
        node is START, the target node becomes the entry point for the graph.

        Args:
            from_node: Name of the source node.
            to_node: Name of the target node.

        Returns:
            StateGraph: The graph instance for method chaining.

        Example:
            >>> graph.add_edge("node1", "node2")
            >>> graph.add_edge(START, "entry_node")  # Sets entry point
        """
        logger.debug("Adding edge from '%s' to '%s'", from_node, to_node)
        # Set entry point if edge is from START
        if from_node == START:
            self.entry_point = to_node
            logger.info("Set entry point to '%s'", to_node)
        self.edges.append(Edge(from_node, to_node))
        logger.debug("Added edge (total edges: %d)", len(self.edges))
        return self

    def add_conditional_edges(
        self,
        from_node: str,
        condition: Callable,
        path_map: dict[str, str] | None = None,
    ) -> "StateGraph":
        """Add conditional routing between nodes based on runtime evaluation.

        Creates dynamic routing logic where the next node is determined by evaluating
        a condition function against the current state. This enables complex branching
        logic, decision trees, and adaptive workflow routing.

        Args:
            from_node: Name of the source node where the condition is evaluated.
            condition: Callable function that takes the current AgentState and returns
                a value used for routing decisions. Should be deterministic and
                side-effect free.
            path_map: Optional dictionary mapping condition results to destination nodes.
                If provided, the condition's return value is looked up in this mapping.
                If None, the condition should return the destination node name directly.

        Returns:
            StateGraph: The graph instance for method chaining.

        Raises:
            ValueError: If the condition function or path_map configuration is invalid.

        Example:
            ```python
            # Direct routing - condition returns node name
            def route_by_priority(state):
                priority = state.data.get("priority", "normal")
                return "urgent_handler" if priority == "high" else "normal_handler"


            graph.add_conditional_edges("classifier", route_by_priority)


            # Mapped routing - condition result mapped to nodes
            def get_category(state):
                return state.data.get("category", "default")


            category_map = {
                "finance": "finance_processor",
                "legal": "legal_processor",
                "default": "general_processor",
            }
            graph.add_conditional_edges("categorizer", get_category, category_map)
            ```

        Note:
            The condition function receives the current AgentState and should return
            consistent results for the same state. If using path_map, ensure the
            condition's return values match the map keys exactly.
        """
        """Add conditional edges from a node based on a condition function.

        Creates edges that are traversed based on the result of a condition
        function. The condition function receives the current state and should
        return a value that determines which edge to follow.

        Args:
            from_node: Name of the source node.
            condition: Function that evaluates the current state and returns
                a value to determine the next node.
            path_map: Optional mapping from condition results to target nodes.
                If provided, creates multiple conditional edges. If None,
                creates a single conditional edge.

        Returns:
            StateGraph: The graph instance for method chaining.

        Example:
            >>> def route_condition(state):
            ...     return "success" if state.success else "failure"
            >>> graph.add_conditional_edges(
            ...     "processor",
            ...     route_condition,
            ...     {"success": "next_step", "failure": "error_handler"},
            ... )
        """
        # Create edges based on possible returns from condition function
        logger.debug(
            "Node '%s' adding conditional edges with path_map: %s",
            from_node,
            path_map,
        )
        if path_map:
            logger.debug(
                "Node '%s' adding conditional edges with path_map: %s", from_node, path_map
            )
            for condition_result, target_node in path_map.items():
                edge = Edge(from_node, target_node, condition)
                edge.condition_result = condition_result
                self.edges.append(edge)
        else:
            # Single conditional edge
            logger.debug("Node '%s' adding single conditional edge", from_node)
            self.edges.append(Edge(from_node, "", condition))
        return self

    def set_entry_point(self, node_name: str) -> "StateGraph":
        """Set the entry point for the graph."""
        self.entry_point = node_name
        self.add_edge(START, node_name)
        logger.info("Set entry point to '%s'", node_name)
        return self

    def compile(
        self,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> "CompiledGraph[StateT]":
        """Compile the graph for execution.

        Args:
            checkpointer: Checkpointer for state persistence
            store: Store for additional data
            debug: Enable debug mode
            interrupt_before: List of node names to interrupt before execution
            interrupt_after: List of node names to interrupt after execution
            callback_manager: Callback manager for executing hooks
        """
        logger.info(
            "Compiling graph with %d nodes, %d edges, entry_point='%s'",
            len(self.nodes),
            len(self.edges),
            self.entry_point,
        )
        logger.debug(
            "Compile options: interrupt_before=%s, interrupt_after=%s",
            interrupt_before,
            interrupt_after,
        )

        if not self.entry_point:
            error_msg = "No entry point set. Use set_entry_point() or add an edge from START."
            logger.error(error_msg)
            raise GraphError(error_msg)

        # Validate graph structure
        logger.debug("Validating graph structure")
        self._validate_graph()
        logger.debug("Graph structure validated successfully")

        # Validate interrupt node names
        interrupt_before = interrupt_before or []
        interrupt_after = interrupt_after or []

        all_interrupt_nodes = set(interrupt_before + interrupt_after)
        invalid_nodes = all_interrupt_nodes - set(self.nodes.keys())
        if invalid_nodes:
            error_msg = f"Invalid interrupt nodes: {invalid_nodes}. Must be existing node names."
            logger.error(error_msg)
            raise GraphError(error_msg)

        self.compiled = True
        logger.info("Graph compilation completed successfully")
        # Import here to avoid circular import at module import time
        # Now update Checkpointer
        if checkpointer is None:
            from pyagenity.checkpointer import InMemoryCheckpointer

            checkpointer = InMemoryCheckpointer[StateT]()
            logger.debug("No checkpointer provided, using InMemoryCheckpointer")

        # Import the CompiledGraph class
        from .compiled_graph import CompiledGraph

        # Setup dependencies
        self._container.bind_instance(
            BaseCheckpointer,
            checkpointer,
            allow_concrete=True,
        )  # not null as we set default
        self._container.bind_instance(
            BaseStore,
            store,
            allow_none=True,
            allow_concrete=True,
        )
        self._container.bind_instance(
            CallbackManager,
            callback_manager,
            allow_concrete=True,
        )  # not null as we set default
        self._container.bind("interrupt_before", interrupt_before)
        self._container.bind("interrupt_after", interrupt_after)
        self._container.bind_instance(StateGraph, self)

        app = CompiledGraph(
            state=self._state,
            interrupt_after=interrupt_after,
            interrupt_before=interrupt_before,
            state_graph=self,
            checkpointer=checkpointer,
            publisher=self._publisher,
            store=store,
            task_manager=self._task_manager,
        )

        self._container.bind(CompiledGraph, app)
        # Compile the Graph, so it will optimize the dependency graph
        self._container.compile()
        return app

    def _validate_graph(self):
        """Validate the graph structure."""
        # Check for orphaned nodes
        connected_nodes = set()
        for edge in self.edges:
            connected_nodes.add(edge.from_node)
            connected_nodes.add(edge.to_node)

        all_nodes = set(self.nodes.keys())
        orphaned = all_nodes - connected_nodes
        if orphaned - {START, END}:  # START and END can be orphaned
            logger.error("Orphaned nodes detected: %s", orphaned - {START, END})
            raise GraphError(f"Orphaned nodes detected: {orphaned - {START, END}}")

        # Check that all edge targets exist
        for edge in self.edges:
            if edge.to_node and edge.to_node not in self.nodes:
                logger.error("Edge '%s' targets non-existent node: %s", edge, edge.to_node)
                raise GraphError(f"Edge targets non-existent node: {edge.to_node}")

Attributes

edges instance-attribute
edges = []
entry_point instance-attribute
entry_point = None
nodes instance-attribute
nodes = {}
thread_name_generator instance-attribute
thread_name_generator = thread_name_generator

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None, thread_name_generator=None)

Initialize a new StateGraph instance.

Parameters:

Name Type Description Default
state
StateT | None

Initial state for the graph. If None, a default AgentState will be created.

None
context_manager
BaseContextManager[StateT] | None

Optional context manager for handling cross-node state operations and advanced state management patterns.

None
dependency_container

Container for managing dependencies that can be injected into node functions. If None, a new empty container will be created.

required
publisher
BasePublisher | None

Publisher for emitting events during execution

None
Note

START and END nodes are automatically added to the graph upon initialization and accept the full node signature including dependencies.

Example
Basic usage with default AgentState

graph = StateGraph()

With custom state

custom_state = MyCustomState() graph = StateGraph(custom_state)

Or using type hints for clarity

graph = StateGraphMyCustomState

Source code in pyagenity/graph/state_graph.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
    thread_name_generator: Callable[[], str] | None = None,
):
    """Initialize a new StateGraph instance.

    Args:
        state: Initial state for the graph. If None, a default AgentState
            will be created.
        context_manager: Optional context manager for handling cross-node
            state operations and advanced state management patterns.
        dependency_container: Container for managing dependencies that can
            be injected into node functions. If None, a new empty container
            will be created.
        publisher: Publisher for emitting events during execution

    Note:
        START and END nodes are automatically added to the graph upon
        initialization and accept the full node signature including
        dependencies.

    Example:
        # Basic usage with default AgentState
        >>> graph = StateGraph()

        # With custom state
        >>> custom_state = MyCustomState()
        >>> graph = StateGraph(custom_state)

        # Or using type hints for clarity
        >>> graph = StateGraph[MyCustomState](MyCustomState())
    """
    logger.info("Initializing StateGraph")
    logger.debug(
        "StateGraph init with state=%s, context_manager=%s",
        type(state).__name__ if state else "default AgentState",
        type(context_manager).__name__ if context_manager else None,
    )

    # State handling
    self._state: StateT = state if state else AgentState()  # type: ignore[assignment]

    # Graph structure
    self.nodes: dict[str, Node] = {}
    self.edges: list[Edge] = []
    self.entry_point: str | None = None

    # Services
    self._publisher: BasePublisher | None = publisher
    self._id_generator: BaseIDGenerator = id_generator
    self._context_manager: BaseContextManager[StateT] | None = context_manager
    self.thread_name_generator = thread_name_generator
    # save container for dependency injection
    # if any container is passed then we will activate that
    # otherwise we can skip it and use the default one
    if container is None:
        self._container = InjectQ.get_instance()
        logger.debug("No container provided, using global singleton instance")
    else:
        logger.debug("Using provided dependency container instance")
        self._container = container
        self._container.activate()

    # Register task_manager, for async tasks
    # This will be used to run background tasks
    self._task_manager = BackgroundTaskManager()

    # now setup the graph
    self._setup()

    # Add START and END nodes (accept full node signature including dependencies)
    logger.debug("Adding default START and END nodes")
    self.nodes[START] = Node(START, lambda state, config, **deps: state, self._publisher)  # type: ignore
    self.nodes[END] = Node(END, lambda state, config, **deps: state, self._publisher)
    logger.debug("StateGraph initialized with %d nodes", len(self.nodes))
add_conditional_edges
add_conditional_edges(from_node, condition, path_map=None)

Add conditional routing between nodes based on runtime evaluation.

Creates dynamic routing logic where the next node is determined by evaluating a condition function against the current state. This enables complex branching logic, decision trees, and adaptive workflow routing.

Parameters:

Name Type Description Default
from_node
str

Name of the source node where the condition is evaluated.

required
condition
Callable

Callable function that takes the current AgentState and returns a value used for routing decisions. Should be deterministic and side-effect free.

required
path_map
dict[str, str] | None

Optional dictionary mapping condition results to destination nodes. If provided, the condition's return value is looked up in this mapping. If None, the condition should return the destination node name directly.

None

Returns:

Name Type Description
StateGraph StateGraph

The graph instance for method chaining.

Raises:

Type Description
ValueError

If the condition function or path_map configuration is invalid.

Example
# Direct routing - condition returns node name
def route_by_priority(state):
    priority = state.data.get("priority", "normal")
    return "urgent_handler" if priority == "high" else "normal_handler"


graph.add_conditional_edges("classifier", route_by_priority)


# Mapped routing - condition result mapped to nodes
def get_category(state):
    return state.data.get("category", "default")


category_map = {
    "finance": "finance_processor",
    "legal": "legal_processor",
    "default": "general_processor",
}
graph.add_conditional_edges("categorizer", get_category, category_map)
Note

The condition function receives the current AgentState and should return consistent results for the same state. If using path_map, ensure the condition's return values match the map keys exactly.

Source code in pyagenity/graph/state_graph.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def add_conditional_edges(
    self,
    from_node: str,
    condition: Callable,
    path_map: dict[str, str] | None = None,
) -> "StateGraph":
    """Add conditional routing between nodes based on runtime evaluation.

    Creates dynamic routing logic where the next node is determined by evaluating
    a condition function against the current state. This enables complex branching
    logic, decision trees, and adaptive workflow routing.

    Args:
        from_node: Name of the source node where the condition is evaluated.
        condition: Callable function that takes the current AgentState and returns
            a value used for routing decisions. Should be deterministic and
            side-effect free.
        path_map: Optional dictionary mapping condition results to destination nodes.
            If provided, the condition's return value is looked up in this mapping.
            If None, the condition should return the destination node name directly.

    Returns:
        StateGraph: The graph instance for method chaining.

    Raises:
        ValueError: If the condition function or path_map configuration is invalid.

    Example:
        ```python
        # Direct routing - condition returns node name
        def route_by_priority(state):
            priority = state.data.get("priority", "normal")
            return "urgent_handler" if priority == "high" else "normal_handler"


        graph.add_conditional_edges("classifier", route_by_priority)


        # Mapped routing - condition result mapped to nodes
        def get_category(state):
            return state.data.get("category", "default")


        category_map = {
            "finance": "finance_processor",
            "legal": "legal_processor",
            "default": "general_processor",
        }
        graph.add_conditional_edges("categorizer", get_category, category_map)
        ```

    Note:
        The condition function receives the current AgentState and should return
        consistent results for the same state. If using path_map, ensure the
        condition's return values match the map keys exactly.
    """
    """Add conditional edges from a node based on a condition function.

    Creates edges that are traversed based on the result of a condition
    function. The condition function receives the current state and should
    return a value that determines which edge to follow.

    Args:
        from_node: Name of the source node.
        condition: Function that evaluates the current state and returns
            a value to determine the next node.
        path_map: Optional mapping from condition results to target nodes.
            If provided, creates multiple conditional edges. If None,
            creates a single conditional edge.

    Returns:
        StateGraph: The graph instance for method chaining.

    Example:
        >>> def route_condition(state):
        ...     return "success" if state.success else "failure"
        >>> graph.add_conditional_edges(
        ...     "processor",
        ...     route_condition,
        ...     {"success": "next_step", "failure": "error_handler"},
        ... )
    """
    # Create edges based on possible returns from condition function
    logger.debug(
        "Node '%s' adding conditional edges with path_map: %s",
        from_node,
        path_map,
    )
    if path_map:
        logger.debug(
            "Node '%s' adding conditional edges with path_map: %s", from_node, path_map
        )
        for condition_result, target_node in path_map.items():
            edge = Edge(from_node, target_node, condition)
            edge.condition_result = condition_result
            self.edges.append(edge)
    else:
        # Single conditional edge
        logger.debug("Node '%s' adding single conditional edge", from_node)
        self.edges.append(Edge(from_node, "", condition))
    return self
add_edge
add_edge(from_node, to_node)

Add a static edge between two nodes.

Creates a direct connection from one node to another. If the source node is START, the target node becomes the entry point for the graph.

Parameters:

Name Type Description Default
from_node
str

Name of the source node.

required
to_node
str

Name of the target node.

required

Returns:

Name Type Description
StateGraph StateGraph

The graph instance for method chaining.

Example

graph.add_edge("node1", "node2") graph.add_edge(START, "entry_node") # Sets entry point

Source code in pyagenity/graph/state_graph.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def add_edge(
    self,
    from_node: str,
    to_node: str,
) -> "StateGraph":
    """Add a static edge between two nodes.

    Creates a direct connection from one node to another. If the source
    node is START, the target node becomes the entry point for the graph.

    Args:
        from_node: Name of the source node.
        to_node: Name of the target node.

    Returns:
        StateGraph: The graph instance for method chaining.

    Example:
        >>> graph.add_edge("node1", "node2")
        >>> graph.add_edge(START, "entry_node")  # Sets entry point
    """
    logger.debug("Adding edge from '%s' to '%s'", from_node, to_node)
    # Set entry point if edge is from START
    if from_node == START:
        self.entry_point = to_node
        logger.info("Set entry point to '%s'", to_node)
    self.edges.append(Edge(from_node, to_node))
    logger.debug("Added edge (total edges: %d)", len(self.edges))
    return self
add_node
add_node(name_or_func, func=None)

Add a node to the graph.

This method supports two calling patterns: 1. Pass a callable as the first argument (name inferred from function name) 2. Pass a name string and callable/ToolNode as separate arguments

Parameters:

Name Type Description Default
name_or_func
str | Callable

Either the node name (str) or a callable function. If callable, the function name will be used as the node name.

required
func
Union[Callable, ToolNode, None]

The function or ToolNode to execute. Required if name_or_func is a string, ignored if name_or_func is callable.

None

Returns:

Name Type Description
StateGraph StateGraph

The graph instance for method chaining.

Raises:

Type Description
ValueError

If invalid arguments are provided.

Example
Method 1: Function name inferred

graph.add_node(my_function)

Method 2: Explicit name and function

graph.add_node("process", my_function)

Source code in pyagenity/graph/state_graph.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def add_node(
    self,
    name_or_func: str | Callable,
    func: Union[Callable, "ToolNode", None] = None,
) -> "StateGraph":
    """Add a node to the graph.

    This method supports two calling patterns:
    1. Pass a callable as the first argument (name inferred from function name)
    2. Pass a name string and callable/ToolNode as separate arguments

    Args:
        name_or_func: Either the node name (str) or a callable function.
            If callable, the function name will be used as the node name.
        func: The function or ToolNode to execute. Required if name_or_func
            is a string, ignored if name_or_func is callable.

    Returns:
        StateGraph: The graph instance for method chaining.

    Raises:
        ValueError: If invalid arguments are provided.

    Example:
        >>> # Method 1: Function name inferred
        >>> graph.add_node(my_function)
        >>> # Method 2: Explicit name and function
        >>> graph.add_node("process", my_function)
    """
    if callable(name_or_func) and func is None:
        # Function passed as first argument
        name = name_or_func.__name__
        func = name_or_func
        logger.debug("Adding node '%s' with inferred name from function", name)
    elif isinstance(name_or_func, str) and (callable(func) or isinstance(func, ToolNode)):
        # Name and function passed separately
        name = name_or_func
        logger.debug(
            "Adding node '%s' with explicit name and %s",
            name,
            "ToolNode" if isinstance(func, ToolNode) else "callable",
        )
    else:
        error_msg = "Invalid arguments for add_node"
        logger.error(error_msg)
        raise ValueError(error_msg)

    self.nodes[name] = Node(name, func)
    logger.info("Added node '%s' to graph (total nodes: %d)", name, len(self.nodes))
    return self
compile
compile(checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())

Compile the graph for execution.

Parameters:

Name Type Description Default
checkpointer
BaseCheckpointer[StateT] | None

Checkpointer for state persistence

None
store
BaseStore | None

Store for additional data

None
debug

Enable debug mode

required
interrupt_before
list[str] | None

List of node names to interrupt before execution

None
interrupt_after
list[str] | None

List of node names to interrupt after execution

None
callback_manager
CallbackManager

Callback manager for executing hooks

CallbackManager()
Source code in pyagenity/graph/state_graph.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def compile(
    self,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> "CompiledGraph[StateT]":
    """Compile the graph for execution.

    Args:
        checkpointer: Checkpointer for state persistence
        store: Store for additional data
        debug: Enable debug mode
        interrupt_before: List of node names to interrupt before execution
        interrupt_after: List of node names to interrupt after execution
        callback_manager: Callback manager for executing hooks
    """
    logger.info(
        "Compiling graph with %d nodes, %d edges, entry_point='%s'",
        len(self.nodes),
        len(self.edges),
        self.entry_point,
    )
    logger.debug(
        "Compile options: interrupt_before=%s, interrupt_after=%s",
        interrupt_before,
        interrupt_after,
    )

    if not self.entry_point:
        error_msg = "No entry point set. Use set_entry_point() or add an edge from START."
        logger.error(error_msg)
        raise GraphError(error_msg)

    # Validate graph structure
    logger.debug("Validating graph structure")
    self._validate_graph()
    logger.debug("Graph structure validated successfully")

    # Validate interrupt node names
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    all_interrupt_nodes = set(interrupt_before + interrupt_after)
    invalid_nodes = all_interrupt_nodes - set(self.nodes.keys())
    if invalid_nodes:
        error_msg = f"Invalid interrupt nodes: {invalid_nodes}. Must be existing node names."
        logger.error(error_msg)
        raise GraphError(error_msg)

    self.compiled = True
    logger.info("Graph compilation completed successfully")
    # Import here to avoid circular import at module import time
    # Now update Checkpointer
    if checkpointer is None:
        from pyagenity.checkpointer import InMemoryCheckpointer

        checkpointer = InMemoryCheckpointer[StateT]()
        logger.debug("No checkpointer provided, using InMemoryCheckpointer")

    # Import the CompiledGraph class
    from .compiled_graph import CompiledGraph

    # Setup dependencies
    self._container.bind_instance(
        BaseCheckpointer,
        checkpointer,
        allow_concrete=True,
    )  # not null as we set default
    self._container.bind_instance(
        BaseStore,
        store,
        allow_none=True,
        allow_concrete=True,
    )
    self._container.bind_instance(
        CallbackManager,
        callback_manager,
        allow_concrete=True,
    )  # not null as we set default
    self._container.bind("interrupt_before", interrupt_before)
    self._container.bind("interrupt_after", interrupt_after)
    self._container.bind_instance(StateGraph, self)

    app = CompiledGraph(
        state=self._state,
        interrupt_after=interrupt_after,
        interrupt_before=interrupt_before,
        state_graph=self,
        checkpointer=checkpointer,
        publisher=self._publisher,
        store=store,
        task_manager=self._task_manager,
    )

    self._container.bind(CompiledGraph, app)
    # Compile the Graph, so it will optimize the dependency graph
    self._container.compile()
    return app
set_entry_point
set_entry_point(node_name)

Set the entry point for the graph.

Source code in pyagenity/graph/state_graph.py
381
382
383
384
385
386
def set_entry_point(self, node_name: str) -> "StateGraph":
    """Set the entry point for the graph."""
    self.entry_point = node_name
    self.add_edge(START, node_name)
    logger.info("Set entry point to '%s'", node_name)
    return self

Functions