Skip to content

Agent

Modules:

Name Description
branch_join
deep_research
guarded
map_reduce
network
plan_act_reflect
rag
react
router
sequential
supervisor_team
swarm

Classes:

Name Description
BranchJoinAgent

Execute multiple branches then join.

DeepResearchAgent

Deep Research Agent: PLAN → RESEARCH → SYNTHESIZE → CRITIQUE loop.

GuardedAgent

Validate output and repair until valid or attempts exhausted.

MapReduceAgent

Map over items then reduce.

NetworkAgent

Network pattern: define arbitrary node set and routing policies.

PlanActReflectAgent

Plan -> Act -> Reflect looping agent.

RAGAgent

Simple RAG: retrieve -> synthesize; optional follow-up.

ReactAgent
RouterAgent

A configurable router-style agent.

SequentialAgent

A simple sequential agent that executes a fixed pipeline of nodes.

SupervisorTeamAgent

Supervisor routes tasks to worker nodes and aggregates results.

SwarmAgent

Swarm pattern: dispatch to many workers, collect, then reach consensus.

Attributes

__all__ module-attribute

__all__ = ['BranchJoinAgent', 'DeepResearchAgent', 'GuardedAgent', 'MapReduceAgent', 'NetworkAgent', 'PlanActReflectAgent', 'RAGAgent', 'ReactAgent', 'RouterAgent', 'SequentialAgent', 'SupervisorTeamAgent', 'SwarmAgent']

Classes

BranchJoinAgent

Execute multiple branches then join.

Note: This prebuilt models branches sequentially (not true parallel execution). For each provided branch node, we add edges branch_i -> JOIN. The JOIN node decides whether more branches remain or END. A more advanced version could use BackgroundTaskManager for concurrency.

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/branch_join.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class BranchJoinAgent[StateT: AgentState]:
    """Execute multiple branches then join.

    Note: This prebuilt models branches sequentially (not true parallel execution).
    For each provided branch node, we add edges branch_i -> JOIN. The JOIN node
    decides whether more branches remain or END. A more advanced version could
    use BackgroundTaskManager for concurrency.
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        branches: dict[str, Callable | tuple[Callable, str]],
        join_node: Callable | tuple[Callable, str],
        next_branch_condition: Callable | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        if not branches:
            raise ValueError("branches must be a non-empty dict of name -> callable/tuple")

        # Add branch nodes
        branch_names = []
        for key, fn in branches.items():
            if isinstance(fn, tuple):
                branch_func, branch_name = fn
                if not callable(branch_func):
                    raise ValueError(f"Branch '{key}'[0] must be callable")
            else:
                branch_func = fn
                branch_name = key
                if not callable(branch_func):
                    raise ValueError(f"Branch '{key}' must be callable")
            self._graph.add_node(branch_name, branch_func)
            branch_names.append(branch_name)

        # Handle join_node
        if isinstance(join_node, tuple):
            join_func, join_name = join_node
            if not callable(join_func):
                raise ValueError("join_node[0] must be callable")
        else:
            join_func = join_node
            join_name = "JOIN"
            if not callable(join_func):
                raise ValueError("join_node must be callable")
        self._graph.add_node(join_name, join_func)

        # Wire branches to JOIN
        for name in branch_names:
            self._graph.add_edge(name, join_name)

        # Entry: first branch
        first = branch_names[0]
        self._graph.set_entry_point(first)

        # Decide next branch or END after join
        if next_branch_condition is None:
            # default: END after join
            def _cond(_: AgentState) -> str:
                return END

            next_branch_condition = _cond

        # next_branch_condition returns a branch name or END
        path_map = {k: k for k in branch_names}
        path_map[END] = END
        self._graph.add_conditional_edges(join_name, next_branch_condition, path_map)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/branch_join.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(branches, join_node, next_branch_condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/branch_join.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def compile(
    self,
    branches: dict[str, Callable | tuple[Callable, str]],
    join_node: Callable | tuple[Callable, str],
    next_branch_condition: Callable | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    if not branches:
        raise ValueError("branches must be a non-empty dict of name -> callable/tuple")

    # Add branch nodes
    branch_names = []
    for key, fn in branches.items():
        if isinstance(fn, tuple):
            branch_func, branch_name = fn
            if not callable(branch_func):
                raise ValueError(f"Branch '{key}'[0] must be callable")
        else:
            branch_func = fn
            branch_name = key
            if not callable(branch_func):
                raise ValueError(f"Branch '{key}' must be callable")
        self._graph.add_node(branch_name, branch_func)
        branch_names.append(branch_name)

    # Handle join_node
    if isinstance(join_node, tuple):
        join_func, join_name = join_node
        if not callable(join_func):
            raise ValueError("join_node[0] must be callable")
    else:
        join_func = join_node
        join_name = "JOIN"
        if not callable(join_func):
            raise ValueError("join_node must be callable")
    self._graph.add_node(join_name, join_func)

    # Wire branches to JOIN
    for name in branch_names:
        self._graph.add_edge(name, join_name)

    # Entry: first branch
    first = branch_names[0]
    self._graph.set_entry_point(first)

    # Decide next branch or END after join
    if next_branch_condition is None:
        # default: END after join
        def _cond(_: AgentState) -> str:
            return END

        next_branch_condition = _cond

    # next_branch_condition returns a branch name or END
    path_map = {k: k for k in branch_names}
    path_map[END] = END
    self._graph.add_conditional_edges(join_name, next_branch_condition, path_map)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

DeepResearchAgent

Deep Research Agent: PLAN → RESEARCH → SYNTHESIZE → CRITIQUE loop.

This agent mirrors modern deep-research patterns inspired by DeerFlow and Tongyi DeepResearch: plan tasks, use tools to research, synthesize findings, critique gaps and iterate a bounded number of times.

Nodes: - PLAN: Decompose problem, propose search/tool actions; may include tool calls - RESEARCH: ToolNode executes search/browse/calc/etc tools - SYNTHESIZE: Aggregate and draft a coherent report or partial answer - CRITIQUE: Identify gaps, contradictions, or follow-ups; can request more tools

Routing:
- PLAN -> conditional(_route_after_plan):
    {"RESEARCH": RESEARCH, "SYNTHESIZE": SYNTHESIZE, END: END}
  • RESEARCH -> SYNTHESIZE
  • SYNTHESIZE -> CRITIQUE
  • CRITIQUE -> conditional(_route_after_critique): {"RESEARCH": RESEARCH, END: END}

Iteration Control: - Uses execution_meta.internal_data keys: dr_max_iters (int): maximum critique→research loops (default 2) dr_iters (int): current loop count (auto-updated) dr_heavy_mode (bool): if True, bias towards one more loop when critique suggests

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/deep_research.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class DeepResearchAgent[StateT: AgentState]:
    """Deep Research Agent: PLAN → RESEARCH → SYNTHESIZE → CRITIQUE loop.

    This agent mirrors modern deep-research patterns inspired by DeerFlow and
    Tongyi DeepResearch: plan tasks, use tools to research, synthesize findings,
    critique gaps and iterate a bounded number of times.

    Nodes:
    - PLAN: Decompose problem, propose search/tool actions; may include tool calls
    - RESEARCH: ToolNode executes search/browse/calc/etc tools
    - SYNTHESIZE: Aggregate and draft a coherent report or partial answer
    - CRITIQUE: Identify gaps, contradictions, or follow-ups; can request more tools

        Routing:
        - PLAN -> conditional(_route_after_plan):
            {"RESEARCH": RESEARCH, "SYNTHESIZE": SYNTHESIZE, END: END}
    - RESEARCH -> SYNTHESIZE
    - SYNTHESIZE -> CRITIQUE
    - CRITIQUE -> conditional(_route_after_critique): {"RESEARCH": RESEARCH, END: END}

    Iteration Control:
    - Uses execution_meta.internal_data keys:
        dr_max_iters (int): maximum critique→research loops (default 2)
        dr_iters (int): current loop count (auto-updated)
        dr_heavy_mode (bool): if True, bias towards one more loop when critique suggests
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
        max_iters: int = 2,
        heavy_mode: bool = False,
    ):
        # initialize graph
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )
        # seed default internal config on prototype state
        # Note: These values will be copied to new state at invoke time.
        exec_meta: ExecutionState = self._graph._state.execution_meta
        exec_meta.internal_data.setdefault("dr_max_iters", max(0, int(max_iters)))
        exec_meta.internal_data.setdefault("dr_iters", 0)
        exec_meta.internal_data.setdefault("dr_heavy_mode", bool(heavy_mode))

    def compile(  # noqa: PLR0912
        self,
        plan_node: Callable | tuple[Callable, str],
        research_tool_node: ToolNode | tuple[ToolNode, str],
        synthesize_node: Callable | tuple[Callable, str],
        critique_node: Callable | tuple[Callable, str],
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle plan_node
        if isinstance(plan_node, tuple):
            plan_func, plan_name = plan_node
            if not callable(plan_func):
                raise ValueError("plan_node[0] must be callable")
        else:
            plan_func = plan_node
            plan_name = "PLAN"
            if not callable(plan_func):
                raise ValueError("plan_node must be callable")

        # Handle research_tool_node
        if isinstance(research_tool_node, tuple):
            research_func, research_name = research_tool_node
            if not isinstance(research_func, ToolNode):
                raise ValueError("research_tool_node[0] must be a ToolNode")
        else:
            research_func = research_tool_node
            research_name = "RESEARCH"
            if not isinstance(research_func, ToolNode):
                raise ValueError("research_tool_node must be a ToolNode")

        # Handle synthesize_node
        if isinstance(synthesize_node, tuple):
            synthesize_func, synthesize_name = synthesize_node
            if not callable(synthesize_func):
                raise ValueError("synthesize_node[0] must be callable")
        else:
            synthesize_func = synthesize_node
            synthesize_name = "SYNTHESIZE"
            if not callable(synthesize_func):
                raise ValueError("synthesize_node must be callable")

        # Handle critique_node
        if isinstance(critique_node, tuple):
            critique_func, critique_name = critique_node
            if not callable(critique_func):
                raise ValueError("critique_node[0] must be callable")
        else:
            critique_func = critique_node
            critique_name = "CRITIQUE"
            if not callable(critique_func):
                raise ValueError("critique_node must be callable")

        # Add nodes
        self._graph.add_node(plan_name, plan_func)
        self._graph.add_node(research_name, research_func)
        self._graph.add_node(synthesize_name, synthesize_func)
        self._graph.add_node(critique_name, critique_func)

        # Edges
        self._graph.add_conditional_edges(
            plan_name,
            _route_after_plan,
            {research_name: research_name, synthesize_name: synthesize_name, END: END},
        )
        self._graph.add_edge(research_name, synthesize_name)
        self._graph.add_edge(synthesize_name, critique_name)
        self._graph.add_conditional_edges(
            critique_name,
            _route_after_critique,
            {research_name: research_name, END: END},
        )

        # Entry
        self._graph.set_entry_point(plan_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None, max_iters=2, heavy_mode=False)
Source code in pyagenity/prebuilt/agent/deep_research.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
    max_iters: int = 2,
    heavy_mode: bool = False,
):
    # initialize graph
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
    # seed default internal config on prototype state
    # Note: These values will be copied to new state at invoke time.
    exec_meta: ExecutionState = self._graph._state.execution_meta
    exec_meta.internal_data.setdefault("dr_max_iters", max(0, int(max_iters)))
    exec_meta.internal_data.setdefault("dr_iters", 0)
    exec_meta.internal_data.setdefault("dr_heavy_mode", bool(heavy_mode))
compile
compile(plan_node, research_tool_node, synthesize_node, critique_node, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/deep_research.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def compile(  # noqa: PLR0912
    self,
    plan_node: Callable | tuple[Callable, str],
    research_tool_node: ToolNode | tuple[ToolNode, str],
    synthesize_node: Callable | tuple[Callable, str],
    critique_node: Callable | tuple[Callable, str],
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle plan_node
    if isinstance(plan_node, tuple):
        plan_func, plan_name = plan_node
        if not callable(plan_func):
            raise ValueError("plan_node[0] must be callable")
    else:
        plan_func = plan_node
        plan_name = "PLAN"
        if not callable(plan_func):
            raise ValueError("plan_node must be callable")

    # Handle research_tool_node
    if isinstance(research_tool_node, tuple):
        research_func, research_name = research_tool_node
        if not isinstance(research_func, ToolNode):
            raise ValueError("research_tool_node[0] must be a ToolNode")
    else:
        research_func = research_tool_node
        research_name = "RESEARCH"
        if not isinstance(research_func, ToolNode):
            raise ValueError("research_tool_node must be a ToolNode")

    # Handle synthesize_node
    if isinstance(synthesize_node, tuple):
        synthesize_func, synthesize_name = synthesize_node
        if not callable(synthesize_func):
            raise ValueError("synthesize_node[0] must be callable")
    else:
        synthesize_func = synthesize_node
        synthesize_name = "SYNTHESIZE"
        if not callable(synthesize_func):
            raise ValueError("synthesize_node must be callable")

    # Handle critique_node
    if isinstance(critique_node, tuple):
        critique_func, critique_name = critique_node
        if not callable(critique_func):
            raise ValueError("critique_node[0] must be callable")
    else:
        critique_func = critique_node
        critique_name = "CRITIQUE"
        if not callable(critique_func):
            raise ValueError("critique_node must be callable")

    # Add nodes
    self._graph.add_node(plan_name, plan_func)
    self._graph.add_node(research_name, research_func)
    self._graph.add_node(synthesize_name, synthesize_func)
    self._graph.add_node(critique_name, critique_func)

    # Edges
    self._graph.add_conditional_edges(
        plan_name,
        _route_after_plan,
        {research_name: research_name, synthesize_name: synthesize_name, END: END},
    )
    self._graph.add_edge(research_name, synthesize_name)
    self._graph.add_edge(synthesize_name, critique_name)
    self._graph.add_conditional_edges(
        critique_name,
        _route_after_critique,
        {research_name: research_name, END: END},
    )

    # Entry
    self._graph.set_entry_point(plan_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

GuardedAgent

Validate output and repair until valid or attempts exhausted.

Nodes: - PRODUCE: main generation node - REPAIR: correction node when validation fails

Edges: PRODUCE -> conditional(valid? END : REPAIR) REPAIR -> PRODUCE

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/guarded.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class GuardedAgent[StateT: AgentState]:
    """Validate output and repair until valid or attempts exhausted.

    Nodes:
    - PRODUCE: main generation node
    - REPAIR: correction node when validation fails

    Edges:
    PRODUCE -> conditional(valid? END : REPAIR)
    REPAIR -> PRODUCE
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        produce_node: Callable | tuple[Callable, str],
        repair_node: Callable | tuple[Callable, str],
        validator: Callable[[AgentState], bool],
        max_attempts: int = 2,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle produce_node
        if isinstance(produce_node, tuple):
            produce_func, produce_name = produce_node
            if not callable(produce_func):
                raise ValueError("produce_node[0] must be callable")
        else:
            produce_func = produce_node
            produce_name = "PRODUCE"
            if not callable(produce_func):
                raise ValueError("produce_node must be callable")

        # Handle repair_node
        if isinstance(repair_node, tuple):
            repair_func, repair_name = repair_node
            if not callable(repair_func):
                raise ValueError("repair_node[0] must be callable")
        else:
            repair_func = repair_node
            repair_name = "REPAIR"
            if not callable(repair_func):
                raise ValueError("repair_node must be callable")

        self._graph.add_node(produce_name, produce_func)
        self._graph.add_node(repair_name, repair_func)

        # produce -> END or REPAIR
        condition = _guard_condition_factory(validator, max_attempts)
        self._graph.add_conditional_edges(
            produce_name,
            condition,
            {repair_name: repair_name, END: END},
        )
        # repair -> produce
        self._graph.add_edge(repair_name, produce_name)

        self._graph.set_entry_point(produce_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/guarded.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(produce_node, repair_node, validator, max_attempts=2, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/guarded.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def compile(
    self,
    produce_node: Callable | tuple[Callable, str],
    repair_node: Callable | tuple[Callable, str],
    validator: Callable[[AgentState], bool],
    max_attempts: int = 2,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle produce_node
    if isinstance(produce_node, tuple):
        produce_func, produce_name = produce_node
        if not callable(produce_func):
            raise ValueError("produce_node[0] must be callable")
    else:
        produce_func = produce_node
        produce_name = "PRODUCE"
        if not callable(produce_func):
            raise ValueError("produce_node must be callable")

    # Handle repair_node
    if isinstance(repair_node, tuple):
        repair_func, repair_name = repair_node
        if not callable(repair_func):
            raise ValueError("repair_node[0] must be callable")
    else:
        repair_func = repair_node
        repair_name = "REPAIR"
        if not callable(repair_func):
            raise ValueError("repair_node must be callable")

    self._graph.add_node(produce_name, produce_func)
    self._graph.add_node(repair_name, repair_func)

    # produce -> END or REPAIR
    condition = _guard_condition_factory(validator, max_attempts)
    self._graph.add_conditional_edges(
        produce_name,
        condition,
        {repair_name: repair_name, END: END},
    )
    # repair -> produce
    self._graph.add_edge(repair_name, produce_name)

    self._graph.set_entry_point(produce_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

MapReduceAgent

Map over items then reduce.

Nodes: - SPLIT: optional, prepares per-item tasks (or state already contains items) - MAP: processes one item per iteration - REDUCE: aggregates results and decides END or continue

Compile requires

map_node: Callable|ToolNode reduce_node: Callable split_node: Callable | None condition: Callable[[AgentState], str] returns "MAP" to continue or END

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/map_reduce.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class MapReduceAgent[StateT: AgentState]:
    """Map over items then reduce.

    Nodes:
    - SPLIT: optional, prepares per-item tasks (or state already contains items)
    - MAP: processes one item per iteration
    - REDUCE: aggregates results and decides END or continue

    Compile requires:
      map_node: Callable|ToolNode
      reduce_node: Callable
      split_node: Callable | None
      condition: Callable[[AgentState], str] returns "MAP" to continue or END
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(  # noqa: PLR0912
        self,
        map_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
        reduce_node: Callable | tuple[Callable, str],
        split_node: Callable | tuple[Callable, str] | None = None,
        condition: Callable[[AgentState], str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle split_node
        split_name = "SPLIT"
        if split_node:
            if isinstance(split_node, tuple):
                split_func, split_name = split_node
                if not callable(split_func):
                    raise ValueError("split_node[0] must be callable")
            else:
                split_func = split_node
                split_name = "SPLIT"
                if not callable(split_func):
                    raise ValueError("split_node must be callable")
            self._graph.add_node(split_name, split_func)

        # Handle map_node
        if isinstance(map_node, tuple):
            map_func, map_name = map_node
            if not (callable(map_func) or isinstance(map_func, ToolNode)):
                raise ValueError("map_node[0] must be callable or ToolNode")
        else:
            map_func = map_node
            map_name = "MAP"
            if not (callable(map_func) or isinstance(map_func, ToolNode)):
                raise ValueError("map_node must be callable or ToolNode")
        self._graph.add_node(map_name, map_func)

        # Handle reduce_node
        if isinstance(reduce_node, tuple):
            reduce_func, reduce_name = reduce_node
            if not callable(reduce_func):
                raise ValueError("reduce_node[0] must be callable")
        else:
            reduce_func = reduce_node
            reduce_name = "REDUCE"
            if not callable(reduce_func):
                raise ValueError("reduce_node must be callable")
        self._graph.add_node(reduce_name, reduce_func)

        # Edges
        if split_node:
            self._graph.add_edge(split_name, map_name)
            self._graph.set_entry_point(split_name)
        else:
            self._graph.set_entry_point(map_name)

        self._graph.add_edge(map_name, reduce_name)

        # Continue mapping or finish
        if condition is None:
            # default: finish after one map-reduce
            def _cond(_: AgentState) -> str:
                return END

            condition = _cond

        self._graph.add_conditional_edges(reduce_name, condition, {map_name: map_name, END: END})

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/map_reduce.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(map_node, reduce_node, split_node=None, condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/map_reduce.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def compile(  # noqa: PLR0912
    self,
    map_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
    reduce_node: Callable | tuple[Callable, str],
    split_node: Callable | tuple[Callable, str] | None = None,
    condition: Callable[[AgentState], str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle split_node
    split_name = "SPLIT"
    if split_node:
        if isinstance(split_node, tuple):
            split_func, split_name = split_node
            if not callable(split_func):
                raise ValueError("split_node[0] must be callable")
        else:
            split_func = split_node
            split_name = "SPLIT"
            if not callable(split_func):
                raise ValueError("split_node must be callable")
        self._graph.add_node(split_name, split_func)

    # Handle map_node
    if isinstance(map_node, tuple):
        map_func, map_name = map_node
        if not (callable(map_func) or isinstance(map_func, ToolNode)):
            raise ValueError("map_node[0] must be callable or ToolNode")
    else:
        map_func = map_node
        map_name = "MAP"
        if not (callable(map_func) or isinstance(map_func, ToolNode)):
            raise ValueError("map_node must be callable or ToolNode")
    self._graph.add_node(map_name, map_func)

    # Handle reduce_node
    if isinstance(reduce_node, tuple):
        reduce_func, reduce_name = reduce_node
        if not callable(reduce_func):
            raise ValueError("reduce_node[0] must be callable")
    else:
        reduce_func = reduce_node
        reduce_name = "REDUCE"
        if not callable(reduce_func):
            raise ValueError("reduce_node must be callable")
    self._graph.add_node(reduce_name, reduce_func)

    # Edges
    if split_node:
        self._graph.add_edge(split_name, map_name)
        self._graph.set_entry_point(split_name)
    else:
        self._graph.set_entry_point(map_name)

    self._graph.add_edge(map_name, reduce_name)

    # Continue mapping or finish
    if condition is None:
        # default: finish after one map-reduce
        def _cond(_: AgentState) -> str:
            return END

        condition = _cond

    self._graph.add_conditional_edges(reduce_name, condition, {map_name: map_name, END: END})

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

NetworkAgent

Network pattern: define arbitrary node set and routing policies.

  • Nodes can be callables or ToolNode.
  • Edges can be static or conditional via a router function per node.
  • Entry point is explicit.

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/network.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class NetworkAgent[StateT: AgentState]:
    """Network pattern: define arbitrary node set and routing policies.

    - Nodes can be callables or ToolNode.
    - Edges can be static or conditional via a router function per node.
    - Entry point is explicit.
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        nodes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        entry: str,
        static_edges: list[tuple[str, str]] | None = None,
        conditional_edges: list[tuple[str, Callable[[AgentState], str], dict[str, str]]]
        | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        if not nodes:
            raise ValueError("nodes must be a non-empty dict")

        # Add nodes
        for key, fn in nodes.items():
            if isinstance(fn, tuple):
                func, name = fn
            else:
                func, name = fn, key
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Node '{key}' must be a callable or ToolNode")
            self._graph.add_node(name, func)

        if entry not in self._graph.nodes:
            raise ValueError(f"entry node '{entry}' must be present in nodes")

        # Static edges
        for src, dst in static_edges or []:
            if src not in self._graph.nodes or dst not in self._graph.nodes:
                raise ValueError(f"Invalid static edge {src}->{dst}: unknown node")
            self._graph.add_edge(src, dst)

        # Conditional edges
        for src, cond, pmap in conditional_edges or []:
            if src not in self._graph.nodes:
                raise ValueError(f"Invalid conditional edge: unknown node '{src}'")
            self._graph.add_conditional_edges(src, cond, pmap)

        # Note: callers may include END in path maps; not enforced here.

        self._graph.set_entry_point(entry)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/network.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(nodes, entry, static_edges=None, conditional_edges=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/network.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def compile(
    self,
    nodes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    entry: str,
    static_edges: list[tuple[str, str]] | None = None,
    conditional_edges: list[tuple[str, Callable[[AgentState], str], dict[str, str]]]
    | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    if not nodes:
        raise ValueError("nodes must be a non-empty dict")

    # Add nodes
    for key, fn in nodes.items():
        if isinstance(fn, tuple):
            func, name = fn
        else:
            func, name = fn, key
        if not (callable(func) or isinstance(func, ToolNode)):
            raise ValueError(f"Node '{key}' must be a callable or ToolNode")
        self._graph.add_node(name, func)

    if entry not in self._graph.nodes:
        raise ValueError(f"entry node '{entry}' must be present in nodes")

    # Static edges
    for src, dst in static_edges or []:
        if src not in self._graph.nodes or dst not in self._graph.nodes:
            raise ValueError(f"Invalid static edge {src}->{dst}: unknown node")
        self._graph.add_edge(src, dst)

    # Conditional edges
    for src, cond, pmap in conditional_edges or []:
        if src not in self._graph.nodes:
            raise ValueError(f"Invalid conditional edge: unknown node '{src}'")
        self._graph.add_conditional_edges(src, cond, pmap)

    # Note: callers may include END in path maps; not enforced here.

    self._graph.set_entry_point(entry)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

PlanActReflectAgent

Plan -> Act -> Reflect looping agent.

Pattern

PLAN -> (condition) -> ACT | REFLECT | END ACT -> REFLECT REFLECT -> PLAN

Default condition (_should_act): - If last assistant message contains tool calls -> ACT - If last message is from a tool -> REFLECT - Else -> END

Provide a custom condition to override this heuristic and implement
  • Budget / depth limiting
  • Confidence-based early stop
  • Dynamic branch selection (e.g., different tool nodes)

Parameters (constructor): state: Optional initial state instance context_manager: Custom context manager publisher: Optional publisher for streaming / events id_generator: ID generation strategy container: InjectQ DI container

compile(...) arguments: plan_node: Callable (state -> state). Produces next thought / tool requests tool_node: ToolNode executing declared tools reflect_node: Callable (state -> state). Consumes tool results & may adjust plan condition: Optional Callable[[AgentState], str] returning next node name or END checkpointer/store/interrupt_before/interrupt_after/callback_manager: Standard graph compilation options

Returns:

Type Description

CompiledGraph ready for invoke / ainvoke.

Notes
  • Node names can be customized via (callable, "NAME") tuples.
  • condition must return one of: tool_node_name, reflect_node_name, END.

Methods:

Name Description
__init__
compile

Compile the Plan-Act-Reflect loop.

Source code in pyagenity/prebuilt/agent/plan_act_reflect.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class PlanActReflectAgent[StateT: AgentState]:
    """Plan -> Act -> Reflect looping agent.

    Pattern:
        PLAN -> (condition) -> ACT | REFLECT | END
        ACT -> REFLECT
        REFLECT -> PLAN

    Default condition (_should_act):
        - If last assistant message contains tool calls -> ACT
        - If last message is from a tool -> REFLECT
        - Else -> END

    Provide a custom condition to override this heuristic and implement:
        * Budget / depth limiting
        * Confidence-based early stop
        * Dynamic branch selection (e.g., different tool nodes)

    Parameters (constructor):
        state: Optional initial state instance
        context_manager: Custom context manager
        publisher: Optional publisher for streaming / events
        id_generator: ID generation strategy
        container: InjectQ DI container

    compile(...) arguments:
        plan_node: Callable (state -> state). Produces next thought / tool requests
        tool_node: ToolNode executing declared tools
        reflect_node: Callable (state -> state). Consumes tool results & may adjust plan
        condition: Optional Callable[[AgentState], str] returning next node name or END
        checkpointer/store/interrupt_before/interrupt_after/callback_manager:
            Standard graph compilation options

    Returns:
        CompiledGraph ready for invoke / ainvoke.

    Notes:
        - Node names can be customized via (callable, "NAME") tuples.
        - condition must return one of: tool_node_name, reflect_node_name, END.
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        plan_node: Callable | tuple[Callable, str],
        tool_node: ToolNode | tuple[ToolNode, str],
        reflect_node: Callable | tuple[Callable, str],
        *,
        condition: Callable[[AgentState], str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        """Compile the Plan-Act-Reflect loop.

        Args:
            plan_node: Callable or (callable, name)
            tool_node: ToolNode or (ToolNode, name)
            reflect_node: Callable or (callable, name)
            condition: Optional decision function. Defaults to internal heuristic.
            checkpointer/store/interrupt_* / callback_manager: Standard graph options.

        Returns:
            CompiledGraph
        """
        # PLAN
        if isinstance(plan_node, tuple):
            plan_func, plan_name = plan_node
            if not callable(plan_func):
                raise ValueError("plan_node[0] must be callable")
        else:
            plan_func = plan_node
            plan_name = "PLAN"
            if not callable(plan_func):
                raise ValueError("plan_node must be callable")

        # ACT
        if isinstance(tool_node, tuple):
            tool_func, tool_name = tool_node
            if not isinstance(tool_func, ToolNode):
                raise ValueError("tool_node[0] must be a ToolNode")
        else:
            tool_func = tool_node
            tool_name = "ACT"
            if not isinstance(tool_func, ToolNode):
                raise ValueError("tool_node must be a ToolNode")

        # REFLECT
        if isinstance(reflect_node, tuple):
            reflect_func, reflect_name = reflect_node
            if not callable(reflect_func):
                raise ValueError("reflect_node[0] must be callable")
        else:
            reflect_func = reflect_node
            reflect_name = "REFLECT"
            if not callable(reflect_func):
                raise ValueError("reflect_node must be callable")

        # Register nodes
        self._graph.add_node(plan_name, plan_func)
        self._graph.add_node(tool_name, tool_func)
        self._graph.add_node(reflect_name, reflect_func)

        # Decision
        decision_fn = condition or _should_act
        self._graph.add_conditional_edges(
            plan_name,
            decision_fn,
            {tool_name: tool_name, reflect_name: reflect_name, END: END},
        )

        # Loop edges
        self._graph.add_edge(tool_name, reflect_name)
        self._graph.add_edge(reflect_name, plan_name)

        # Entry
        self._graph.set_entry_point(plan_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/plan_act_reflect.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(plan_node, tool_node, reflect_node, *, condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())

Compile the Plan-Act-Reflect loop.

Parameters:

Name Type Description Default
plan_node
Callable | tuple[Callable, str]

Callable or (callable, name)

required
tool_node
ToolNode | tuple[ToolNode, str]

ToolNode or (ToolNode, name)

required
reflect_node
Callable | tuple[Callable, str]

Callable or (callable, name)

required
condition
Callable[[AgentState], str] | None

Optional decision function. Defaults to internal heuristic.

None
checkpointer/store/interrupt_* / callback_manager

Standard graph options.

required

Returns:

Type Description
CompiledGraph

CompiledGraph

Source code in pyagenity/prebuilt/agent/plan_act_reflect.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def compile(
    self,
    plan_node: Callable | tuple[Callable, str],
    tool_node: ToolNode | tuple[ToolNode, str],
    reflect_node: Callable | tuple[Callable, str],
    *,
    condition: Callable[[AgentState], str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    """Compile the Plan-Act-Reflect loop.

    Args:
        plan_node: Callable or (callable, name)
        tool_node: ToolNode or (ToolNode, name)
        reflect_node: Callable or (callable, name)
        condition: Optional decision function. Defaults to internal heuristic.
        checkpointer/store/interrupt_* / callback_manager: Standard graph options.

    Returns:
        CompiledGraph
    """
    # PLAN
    if isinstance(plan_node, tuple):
        plan_func, plan_name = plan_node
        if not callable(plan_func):
            raise ValueError("plan_node[0] must be callable")
    else:
        plan_func = plan_node
        plan_name = "PLAN"
        if not callable(plan_func):
            raise ValueError("plan_node must be callable")

    # ACT
    if isinstance(tool_node, tuple):
        tool_func, tool_name = tool_node
        if not isinstance(tool_func, ToolNode):
            raise ValueError("tool_node[0] must be a ToolNode")
    else:
        tool_func = tool_node
        tool_name = "ACT"
        if not isinstance(tool_func, ToolNode):
            raise ValueError("tool_node must be a ToolNode")

    # REFLECT
    if isinstance(reflect_node, tuple):
        reflect_func, reflect_name = reflect_node
        if not callable(reflect_func):
            raise ValueError("reflect_node[0] must be callable")
    else:
        reflect_func = reflect_node
        reflect_name = "REFLECT"
        if not callable(reflect_func):
            raise ValueError("reflect_node must be callable")

    # Register nodes
    self._graph.add_node(plan_name, plan_func)
    self._graph.add_node(tool_name, tool_func)
    self._graph.add_node(reflect_name, reflect_func)

    # Decision
    decision_fn = condition or _should_act
    self._graph.add_conditional_edges(
        plan_name,
        decision_fn,
        {tool_name: tool_name, reflect_name: reflect_name, END: END},
    )

    # Loop edges
    self._graph.add_edge(tool_name, reflect_name)
    self._graph.add_edge(reflect_name, plan_name)

    # Entry
    self._graph.set_entry_point(plan_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

RAGAgent

Simple RAG: retrieve -> synthesize; optional follow-up.

Nodes: - RETRIEVE: uses a retriever (callable or ToolNode) to fetch context - SYNTHESIZE: LLM/composer builds an answer - Optional condition: loop back to RETRIEVE for follow-up queries; else END

Methods:

Name Description
__init__
compile
compile_advanced

Advanced RAG wiring with hybrid retrieval and optional stages.

Source code in pyagenity/prebuilt/agent/rag.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class RAGAgent[StateT: AgentState]:
    """Simple RAG: retrieve -> synthesize; optional follow-up.

    Nodes:
    - RETRIEVE: uses a retriever (callable or ToolNode) to fetch context
    - SYNTHESIZE: LLM/composer builds an answer
    - Optional condition: loop back to RETRIEVE for follow-up queries; else END
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        retriever_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
        synthesize_node: Callable | tuple[Callable, str],
        followup_condition: Callable[[AgentState], str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Nodes
        # Handle retriever_node
        if isinstance(retriever_node, tuple):
            retriever_func, retriever_name = retriever_node
            if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
                raise ValueError("retriever_node[0] must be callable or ToolNode")
        else:
            retriever_func = retriever_node
            retriever_name = "RETRIEVE"
            if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
                raise ValueError("retriever_node must be callable or ToolNode")

        # Handle synthesize_node
        if isinstance(synthesize_node, tuple):
            synthesize_func, synthesize_name = synthesize_node
            if not callable(synthesize_func):
                raise ValueError("synthesize_node[0] must be callable")
        else:
            synthesize_func = synthesize_node
            synthesize_name = "SYNTHESIZE"
            if not callable(synthesize_func):
                raise ValueError("synthesize_node must be callable")

        self._graph.add_node(retriever_name, retriever_func)  # type: ignore[arg-type]
        self._graph.add_node(synthesize_name, synthesize_func)

        # Edges
        self._graph.add_edge(retriever_name, synthesize_name)
        self._graph.set_entry_point(retriever_name)

        if followup_condition is None:
            # default: END after synthesize
            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        self._graph.add_conditional_edges(
            synthesize_name,
            followup_condition,
            {retriever_name: retriever_name, END: END},
        )

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    def compile_advanced(
        self,
        retriever_nodes: list[Callable | ToolNode | tuple[Callable | ToolNode, str]],
        synthesize_node: Callable | tuple[Callable, str],
        options: dict | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        """Advanced RAG wiring with hybrid retrieval and optional stages.

        Chain:
          (QUERY_PLAN?) -> R1 -> (MERGE?) -> R2 -> (MERGE?) -> ...
          -> (RERANK?) -> (COMPRESS?) -> SYNTHESIZE -> cond
        Each retriever may be a different modality (sparse, dense, self-query, MMR, etc.).
        """

        options = options or {}
        query_plan_node = options.get("query_plan")
        merger_node = options.get("merge")
        rerank_node = options.get("rerank")
        compress_node = options.get("compress")
        followup_condition = options.get("followup_condition")

        qname = self._add_optional_node(
            query_plan_node,
            default_name="QUERY_PLAN",
            label="query_plan",
        )

        # Add retrievers
        r_names = self._add_retriever_nodes(retriever_nodes)

        # Optional stages
        mname = self._add_optional_node(merger_node, default_name="MERGE", label="merge")
        rrname = self._add_optional_node(rerank_node, default_name="RERANK", label="rerank")
        cname = self._add_optional_node(
            compress_node,
            default_name="COMPRESS",
            label="compress",
        )

        # Synthesize
        sname = self._add_synthesize_node(synthesize_node)

        # Wire edges end-to-end and follow-up
        self._wire_advanced_edges(qname, r_names, mname, rrname, cname, sname, followup_condition)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    # ---- helpers ----
    def _add_optional_node(
        self,
        node: Callable | tuple[Callable, str] | None,
        *,
        default_name: str,
        label: str,
    ) -> str | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, default_name
        if not callable(func):
            raise ValueError(f"{label} node must be callable")
        self._graph.add_node(name, func)
        return name

    def _add_retriever_nodes(
        self,
        retriever_nodes: list[Callable | ToolNode | tuple[Callable | ToolNode, str]],
    ) -> list[str]:
        if not retriever_nodes:
            raise ValueError("retriever_nodes must be non-empty")
        names: list[str] = []
        for idx, rn in enumerate(retriever_nodes):
            if isinstance(rn, tuple):
                rfunc, rname = rn
            else:
                rfunc, rname = rn, f"RETRIEVE_{idx + 1}"
            if not (callable(rfunc) or isinstance(rfunc, ToolNode)):
                raise ValueError("retriever must be callable or ToolNode")
            self._graph.add_node(rname, rfunc)  # type: ignore[arg-type]
            names.append(rname)
        return names

    def _add_synthesize_node(self, synthesize_node: Callable | tuple[Callable, str]) -> str:
        if isinstance(synthesize_node, tuple):
            sfunc, sname = synthesize_node
        else:
            sfunc, sname = synthesize_node, "SYNTHESIZE"
        if not callable(sfunc):
            raise ValueError("synthesize_node must be callable")
        self._graph.add_node(sname, sfunc)
        return sname

    def _wire_advanced_edges(
        self,
        qname: str | None,
        r_names: list[str],
        mname: str | None,
        rrname: str | None,
        cname: str | None,
        sname: str,
        followup_condition: Callable[[AgentState], str] | None = None,
    ) -> None:
        entry = qname or r_names[0]
        self._graph.set_entry_point(entry)
        if qname:
            self._graph.add_edge(qname, r_names[0])

        tail_target = rrname or cname or sname
        for i, rname in enumerate(r_names):
            is_last = i == len(r_names) - 1
            nxt = r_names[i + 1] if not is_last else tail_target
            if mname:
                self._graph.add_edge(rname, mname)
                self._graph.add_edge(mname, nxt)
            else:
                self._graph.add_edge(rname, nxt)

        # Tail wiring
        if rrname and cname:
            self._graph.add_edge(rrname, cname)
            self._graph.add_edge(cname, sname)
        elif rrname:
            self._graph.add_edge(rrname, sname)
        elif cname:
            self._graph.add_edge(cname, sname)

        # default follow-up to END
        if followup_condition is None:

            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        entry_node = qname or r_names[0]
        path_map = {entry_node: entry_node, END: END}
        self._graph.add_conditional_edges(sname, followup_condition, path_map)

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/rag.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(retriever_node, synthesize_node, followup_condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/rag.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def compile(
    self,
    retriever_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
    synthesize_node: Callable | tuple[Callable, str],
    followup_condition: Callable[[AgentState], str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Nodes
    # Handle retriever_node
    if isinstance(retriever_node, tuple):
        retriever_func, retriever_name = retriever_node
        if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
            raise ValueError("retriever_node[0] must be callable or ToolNode")
    else:
        retriever_func = retriever_node
        retriever_name = "RETRIEVE"
        if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
            raise ValueError("retriever_node must be callable or ToolNode")

    # Handle synthesize_node
    if isinstance(synthesize_node, tuple):
        synthesize_func, synthesize_name = synthesize_node
        if not callable(synthesize_func):
            raise ValueError("synthesize_node[0] must be callable")
    else:
        synthesize_func = synthesize_node
        synthesize_name = "SYNTHESIZE"
        if not callable(synthesize_func):
            raise ValueError("synthesize_node must be callable")

    self._graph.add_node(retriever_name, retriever_func)  # type: ignore[arg-type]
    self._graph.add_node(synthesize_name, synthesize_func)

    # Edges
    self._graph.add_edge(retriever_name, synthesize_name)
    self._graph.set_entry_point(retriever_name)

    if followup_condition is None:
        # default: END after synthesize
        def _cond(_: AgentState) -> str:
            return END

        followup_condition = _cond

    self._graph.add_conditional_edges(
        synthesize_name,
        followup_condition,
        {retriever_name: retriever_name, END: END},
    )

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )
compile_advanced
compile_advanced(retriever_nodes, synthesize_node, options=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())

Advanced RAG wiring with hybrid retrieval and optional stages.

Chain

(QUERY_PLAN?) -> R1 -> (MERGE?) -> R2 -> (MERGE?) -> ... -> (RERANK?) -> (COMPRESS?) -> SYNTHESIZE -> cond

Each retriever may be a different modality (sparse, dense, self-query, MMR, etc.).

Source code in pyagenity/prebuilt/agent/rag.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def compile_advanced(
    self,
    retriever_nodes: list[Callable | ToolNode | tuple[Callable | ToolNode, str]],
    synthesize_node: Callable | tuple[Callable, str],
    options: dict | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    """Advanced RAG wiring with hybrid retrieval and optional stages.

    Chain:
      (QUERY_PLAN?) -> R1 -> (MERGE?) -> R2 -> (MERGE?) -> ...
      -> (RERANK?) -> (COMPRESS?) -> SYNTHESIZE -> cond
    Each retriever may be a different modality (sparse, dense, self-query, MMR, etc.).
    """

    options = options or {}
    query_plan_node = options.get("query_plan")
    merger_node = options.get("merge")
    rerank_node = options.get("rerank")
    compress_node = options.get("compress")
    followup_condition = options.get("followup_condition")

    qname = self._add_optional_node(
        query_plan_node,
        default_name="QUERY_PLAN",
        label="query_plan",
    )

    # Add retrievers
    r_names = self._add_retriever_nodes(retriever_nodes)

    # Optional stages
    mname = self._add_optional_node(merger_node, default_name="MERGE", label="merge")
    rrname = self._add_optional_node(rerank_node, default_name="RERANK", label="rerank")
    cname = self._add_optional_node(
        compress_node,
        default_name="COMPRESS",
        label="compress",
    )

    # Synthesize
    sname = self._add_synthesize_node(synthesize_node)

    # Wire edges end-to-end and follow-up
    self._wire_advanced_edges(qname, r_names, mname, rrname, cname, sname, followup_condition)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

ReactAgent

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/react.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class ReactAgent[StateT: AgentState]:
    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        main_node: tuple[Callable, str] | Callable,
        tool_node: tuple[Callable, str] | Callable,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Determine main node function and name
        if isinstance(main_node, tuple):
            main_func, main_name = main_node
            if not callable(main_func):
                raise ValueError("main_node[0] must be a callable function")
        else:
            main_func = main_node
            main_name = "MAIN"
            if not callable(main_func):
                raise ValueError("main_node must be a callable function")

        # Determine tool node function and name
        if isinstance(tool_node, tuple):
            tool_func, tool_name = tool_node
            # Accept both callable functions and ToolNode instances
            if not callable(tool_func) and not hasattr(tool_func, "invoke"):
                raise ValueError("tool_node[0] must be a callable function or ToolNode")
        else:
            tool_func = tool_node
            tool_name = "TOOL"
            # Accept both callable functions and ToolNode instances
            # ToolNode instances have an 'invoke' method but are not callable
            if not callable(tool_func) and not hasattr(tool_func, "invoke"):
                raise ValueError("tool_node must be a callable function or ToolNode instance")

        self._graph.add_node(main_name, main_func)
        self._graph.add_node(tool_name, tool_func)

        # Now create edges
        self._graph.add_conditional_edges(
            main_name,
            _should_use_tools,
            {tool_name: tool_name, END: END},
        )

        self._graph.add_edge(tool_name, main_name)
        self._graph.set_entry_point(main_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/react.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(main_node, tool_node, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/react.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def compile(
    self,
    main_node: tuple[Callable, str] | Callable,
    tool_node: tuple[Callable, str] | Callable,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Determine main node function and name
    if isinstance(main_node, tuple):
        main_func, main_name = main_node
        if not callable(main_func):
            raise ValueError("main_node[0] must be a callable function")
    else:
        main_func = main_node
        main_name = "MAIN"
        if not callable(main_func):
            raise ValueError("main_node must be a callable function")

    # Determine tool node function and name
    if isinstance(tool_node, tuple):
        tool_func, tool_name = tool_node
        # Accept both callable functions and ToolNode instances
        if not callable(tool_func) and not hasattr(tool_func, "invoke"):
            raise ValueError("tool_node[0] must be a callable function or ToolNode")
    else:
        tool_func = tool_node
        tool_name = "TOOL"
        # Accept both callable functions and ToolNode instances
        # ToolNode instances have an 'invoke' method but are not callable
        if not callable(tool_func) and not hasattr(tool_func, "invoke"):
            raise ValueError("tool_node must be a callable function or ToolNode instance")

    self._graph.add_node(main_name, main_func)
    self._graph.add_node(tool_name, tool_func)

    # Now create edges
    self._graph.add_conditional_edges(
        main_name,
        _should_use_tools,
        {tool_name: tool_name, END: END},
    )

    self._graph.add_edge(tool_name, main_name)
    self._graph.set_entry_point(main_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

RouterAgent

A configurable router-style agent.

Pattern: - A router node runs (LLM or custom logic) and may update state/messages - A condition function inspects the state and returns a route key - Edges route to the matching node; each route returns back to ROUTER - Return END (via condition) to finish

Usage

router = RouterAgent() app = router.compile( router_node=my_router_func, # def my_router_func(state, config, ...) routes={ "search": search_node, "summarize": summarize_node, }, # Condition inspects state and returns one of the keys above or END condition=my_condition, # def my_condition(state) -> str # Optional explicit path map if returned keys differ from node names # path_map={"SEARCH": "search", "SUM": "summarize", END: END} )

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/router.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class RouterAgent[StateT: AgentState]:
    """A configurable router-style agent.

    Pattern:
    - A router node runs (LLM or custom logic) and may update state/messages
    - A condition function inspects the state and returns a route key
    - Edges route to the matching node; each route returns back to ROUTER
    - Return END (via condition) to finish

    Usage:
        router = RouterAgent()
        app = router.compile(
            router_node=my_router_func,  # def my_router_func(state, config, ...)
            routes={
                "search": search_node,
                "summarize": summarize_node,
            },
            # Condition inspects state and returns one of the keys above or END
            condition=my_condition,  # def my_condition(state) -> str
            # Optional explicit path map if returned keys differ from node names
            # path_map={"SEARCH": "search", "SUM": "summarize", END: END}
        )
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(  # noqa: PLR0912
        self,
        router_node: Callable | tuple[Callable, str],
        routes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        condition: Callable[[AgentState], str] | None = None,
        path_map: dict[str, str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle router_node
        if isinstance(router_node, tuple):
            router_func, router_name = router_node
            if not callable(router_func):
                raise ValueError("router_node[0] must be callable")
        else:
            router_func = router_node
            router_name = "ROUTER"
            if not callable(router_func):
                raise ValueError("router_node must be callable")

        if not routes:
            raise ValueError("routes must be a non-empty dict of name -> callable/ToolNode/tuple")

        # Add route nodes
        route_names = []
        for key, func in routes.items():
            if isinstance(func, tuple):
                route_func, route_name = func
                if not (callable(route_func) or isinstance(route_func, ToolNode)):
                    raise ValueError(f"Route '{key}'[0] must be callable or ToolNode")
            else:
                route_func = func
                route_name = key
                if not (callable(route_func) or isinstance(route_func, ToolNode)):
                    raise ValueError(f"Route '{key}' must be callable or ToolNode")
            self._graph.add_node(route_name, route_func)
            route_names.append(route_name)

        # Add router node as entry
        self._graph.add_node(router_name, router_func)

        # Build default condition/path_map if needed
        if condition is None and len(route_names) == 1:
            only = route_names[0]

            def _always(_: AgentState) -> str:
                return only

            condition = _always
            path_map = {only: only, END: END}

        if condition is None and len(route_names) > 1:
            raise ValueError("condition must be provided when multiple routes are defined")

        # If path_map is not provided, assume router returns exact route names
        if path_map is None:
            path_map = {k: k for k in route_names}
            path_map[END] = END

        # Conditional edges from router node based on condition results
        self._graph.add_conditional_edges(
            router_name,
            condition,  # type: ignore[arg-type]
            path_map,
        )

        # Loop back to router node from each route node
        for name in route_names:
            self._graph.add_edge(name, router_name)

        # Entry
        self._graph.set_entry_point(router_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/router.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(router_node, routes, condition=None, path_map=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/router.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def compile(  # noqa: PLR0912
    self,
    router_node: Callable | tuple[Callable, str],
    routes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    condition: Callable[[AgentState], str] | None = None,
    path_map: dict[str, str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle router_node
    if isinstance(router_node, tuple):
        router_func, router_name = router_node
        if not callable(router_func):
            raise ValueError("router_node[0] must be callable")
    else:
        router_func = router_node
        router_name = "ROUTER"
        if not callable(router_func):
            raise ValueError("router_node must be callable")

    if not routes:
        raise ValueError("routes must be a non-empty dict of name -> callable/ToolNode/tuple")

    # Add route nodes
    route_names = []
    for key, func in routes.items():
        if isinstance(func, tuple):
            route_func, route_name = func
            if not (callable(route_func) or isinstance(route_func, ToolNode)):
                raise ValueError(f"Route '{key}'[0] must be callable or ToolNode")
        else:
            route_func = func
            route_name = key
            if not (callable(route_func) or isinstance(route_func, ToolNode)):
                raise ValueError(f"Route '{key}' must be callable or ToolNode")
        self._graph.add_node(route_name, route_func)
        route_names.append(route_name)

    # Add router node as entry
    self._graph.add_node(router_name, router_func)

    # Build default condition/path_map if needed
    if condition is None and len(route_names) == 1:
        only = route_names[0]

        def _always(_: AgentState) -> str:
            return only

        condition = _always
        path_map = {only: only, END: END}

    if condition is None and len(route_names) > 1:
        raise ValueError("condition must be provided when multiple routes are defined")

    # If path_map is not provided, assume router returns exact route names
    if path_map is None:
        path_map = {k: k for k in route_names}
        path_map[END] = END

    # Conditional edges from router node based on condition results
    self._graph.add_conditional_edges(
        router_name,
        condition,  # type: ignore[arg-type]
        path_map,
    )

    # Loop back to router node from each route node
    for name in route_names:
        self._graph.add_edge(name, router_name)

    # Entry
    self._graph.set_entry_point(router_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

SequentialAgent

A simple sequential agent that executes a fixed pipeline of nodes.

Pattern: - Nodes run in the provided order: step1 -> step2 -> ... -> stepN - After the last step, the graph ends

Usage

seq = SequentialAgent() app = seq.compile([ ("ingest", ingest_node), ("plan", plan_node), ("execute", execute_node), ])

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/sequential.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class SequentialAgent[StateT: AgentState]:
    """A simple sequential agent that executes a fixed pipeline of nodes.

    Pattern:
    - Nodes run in the provided order: step1 -> step2 -> ... -> stepN
    - After the last step, the graph ends

    Usage:
        seq = SequentialAgent()
        app = seq.compile([
            ("ingest", ingest_node),
            ("plan", plan_node),
            ("execute", execute_node),
        ])
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        steps: Sequence[tuple[str, Callable | ToolNode] | tuple[Callable | ToolNode, str]],
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        if not steps or len(steps) == 0:
            raise ValueError(
                "steps must be a non-empty sequence of (name, callable/ToolNode) o"
                "or (callable/ToolNode, name)"
            )

        # Add nodes
        step_names = []
        for step in steps:
            if isinstance(step[0], str):
                name, func = step
            else:
                func, name = step
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Step '{name}' must be a callable or ToolNode")
            self._graph.add_node(name, func)  # type: ignore[arg-type]
            step_names.append(name)

        # Static edges in order
        for i in range(len(step_names) - 1):
            self._graph.add_edge(step_names[i], step_names[i + 1])

        # Entry is the first step
        self._graph.set_entry_point(step_names[0])

        # No explicit edge to END needed; the engine will end if no outgoing edges remain.
        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/sequential.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(steps, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/sequential.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def compile(
    self,
    steps: Sequence[tuple[str, Callable | ToolNode] | tuple[Callable | ToolNode, str]],
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    if not steps or len(steps) == 0:
        raise ValueError(
            "steps must be a non-empty sequence of (name, callable/ToolNode) o"
            "or (callable/ToolNode, name)"
        )

    # Add nodes
    step_names = []
    for step in steps:
        if isinstance(step[0], str):
            name, func = step
        else:
            func, name = step
        if not (callable(func) or isinstance(func, ToolNode)):
            raise ValueError(f"Step '{name}' must be a callable or ToolNode")
        self._graph.add_node(name, func)  # type: ignore[arg-type]
        step_names.append(name)

    # Static edges in order
    for i in range(len(step_names) - 1):
        self._graph.add_edge(step_names[i], step_names[i + 1])

    # Entry is the first step
    self._graph.set_entry_point(step_names[0])

    # No explicit edge to END needed; the engine will end if no outgoing edges remain.
    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

SupervisorTeamAgent

Supervisor routes tasks to worker nodes and aggregates results.

Nodes: - SUPERVISOR: decides which worker to call (by returning a worker key) or END - Multiple WORKER nodes: functions or ToolNode instances - AGGREGATE: optional aggregator node after worker runs; loops back to SUPERVISOR

The compile requires

supervisor_node: Callable workers: dict[str, Callable|ToolNode] aggregate_node: Callable | None condition: Callable[[AgentState], str] returns worker key or END

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/supervisor_team.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class SupervisorTeamAgent[StateT: AgentState]:
    """Supervisor routes tasks to worker nodes and aggregates results.

    Nodes:
    - SUPERVISOR: decides which worker to call (by returning a worker key) or END
    - Multiple WORKER nodes: functions or ToolNode instances
    - AGGREGATE: optional aggregator node after worker runs; loops back to SUPERVISOR

    The compile requires:
      supervisor_node: Callable
      workers: dict[str, Callable|ToolNode]
      aggregate_node: Callable | None
      condition: Callable[[AgentState], str] returns worker key or END
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(  # noqa: PLR0912
        self,
        supervisor_node: Callable | tuple[Callable, str],
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        condition: Callable[[AgentState], str],
        aggregate_node: Callable | tuple[Callable, str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle supervisor_node
        if isinstance(supervisor_node, tuple):
            supervisor_func, supervisor_name = supervisor_node
            if not callable(supervisor_func):
                raise ValueError("supervisor_node[0] must be callable")
        else:
            supervisor_func = supervisor_node
            supervisor_name = "SUPERVISOR"
            if not callable(supervisor_func):
                raise ValueError("supervisor_node must be callable")

        if not workers:
            raise ValueError("workers must be a non-empty dict")

        # Add worker nodes
        worker_names = []
        for key, fn in workers.items():
            if isinstance(fn, tuple):
                worker_func, worker_name = fn
                if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                    raise ValueError(f"Worker '{key}'[0] must be callable or ToolNode")
            else:
                worker_func = fn
                worker_name = key
                if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                    raise ValueError(f"Worker '{key}' must be callable or ToolNode")
            self._graph.add_node(worker_name, worker_func)
            worker_names.append(worker_name)

        # Handle aggregate_node
        aggregate_name = "AGGREGATE"
        if aggregate_node:
            if isinstance(aggregate_node, tuple):
                aggregate_func, aggregate_name = aggregate_node
                if not callable(aggregate_func):
                    raise ValueError("aggregate_node[0] must be callable")
            else:
                aggregate_func = aggregate_node
                aggregate_name = "AGGREGATE"
                if not callable(aggregate_func):
                    raise ValueError("aggregate_node must be callable")
            self._graph.add_node(aggregate_name, aggregate_func)

        # SUPERVISOR decides next worker
        path_map = {k: k for k in worker_names}
        path_map[END] = END
        self._graph.add_conditional_edges(supervisor_name, condition, path_map)

        # After worker, go to AGGREGATE if present, else back to SUPERVISOR
        for name in worker_names:
            self._graph.add_edge(name, aggregate_name if aggregate_node else supervisor_name)

        if aggregate_node:
            self._graph.add_edge(aggregate_name, supervisor_name)

        self._graph.set_entry_point(supervisor_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/supervisor_team.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(supervisor_node, workers, condition, aggregate_node=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/supervisor_team.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def compile(  # noqa: PLR0912
    self,
    supervisor_node: Callable | tuple[Callable, str],
    workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    condition: Callable[[AgentState], str],
    aggregate_node: Callable | tuple[Callable, str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle supervisor_node
    if isinstance(supervisor_node, tuple):
        supervisor_func, supervisor_name = supervisor_node
        if not callable(supervisor_func):
            raise ValueError("supervisor_node[0] must be callable")
    else:
        supervisor_func = supervisor_node
        supervisor_name = "SUPERVISOR"
        if not callable(supervisor_func):
            raise ValueError("supervisor_node must be callable")

    if not workers:
        raise ValueError("workers must be a non-empty dict")

    # Add worker nodes
    worker_names = []
    for key, fn in workers.items():
        if isinstance(fn, tuple):
            worker_func, worker_name = fn
            if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                raise ValueError(f"Worker '{key}'[0] must be callable or ToolNode")
        else:
            worker_func = fn
            worker_name = key
            if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                raise ValueError(f"Worker '{key}' must be callable or ToolNode")
        self._graph.add_node(worker_name, worker_func)
        worker_names.append(worker_name)

    # Handle aggregate_node
    aggregate_name = "AGGREGATE"
    if aggregate_node:
        if isinstance(aggregate_node, tuple):
            aggregate_func, aggregate_name = aggregate_node
            if not callable(aggregate_func):
                raise ValueError("aggregate_node[0] must be callable")
        else:
            aggregate_func = aggregate_node
            aggregate_name = "AGGREGATE"
            if not callable(aggregate_func):
                raise ValueError("aggregate_node must be callable")
        self._graph.add_node(aggregate_name, aggregate_func)

    # SUPERVISOR decides next worker
    path_map = {k: k for k in worker_names}
    path_map[END] = END
    self._graph.add_conditional_edges(supervisor_name, condition, path_map)

    # After worker, go to AGGREGATE if present, else back to SUPERVISOR
    for name in worker_names:
        self._graph.add_edge(name, aggregate_name if aggregate_node else supervisor_name)

    if aggregate_node:
        self._graph.add_edge(aggregate_name, supervisor_name)

    self._graph.set_entry_point(supervisor_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

SwarmAgent

Swarm pattern: dispatch to many workers, collect, then reach consensus.

Notes: - The underlying engine executes nodes sequentially; true parallelism isn't performed at the graph level. For concurrency, worker/collector nodes can internally use BackgroundTaskManager or async to fan-out. - This pattern wires a linear broadcast-collect chain ending in CONSENSUS.

Nodes: - optional DISPATCH: prepare/plan the swarm task - WORKER_i: a set of worker nodes (Callable or ToolNode) - optional COLLECT: consolidate each worker's result into shared state - CONSENSUS: aggregate all collected results and produce final output

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/swarm.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class SwarmAgent[StateT: AgentState]:
    """Swarm pattern: dispatch to many workers, collect, then reach consensus.

    Notes:
    - The underlying engine executes nodes sequentially; true parallelism isn't
      performed at the graph level. For concurrency, worker/collector nodes can
      internally use BackgroundTaskManager or async to fan-out.
    - This pattern wires a linear broadcast-collect chain ending in CONSENSUS.

    Nodes:
    - optional DISPATCH: prepare/plan the swarm task
    - WORKER_i: a set of worker nodes (Callable or ToolNode)
    - optional COLLECT: consolidate each worker's result into shared state
    - CONSENSUS: aggregate all collected results and produce final output
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        consensus_node: Callable | tuple[Callable, str],
        options: dict | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        resolved_workers = self._add_worker_nodes(workers)
        worker_sequence = resolved_workers

        options = options or {}
        dispatch_node = options.get("dispatch")
        collect_node = options.get("collect")
        followup_condition = options.get("followup_condition")

        dispatch_name = self._resolve_dispatch(dispatch_node)
        collect_info = self._resolve_collect(collect_node)
        consensus_name = self._resolve_consensus(consensus_node)

        entry = dispatch_name or worker_sequence[0]
        self._graph.set_entry_point(entry)
        if dispatch_name:
            self._graph.add_edge(dispatch_name, worker_sequence[0])

        self._wire_edges(worker_sequence, collect_info, consensus_name)

        if followup_condition is None:

            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        self._graph.add_conditional_edges(
            consensus_name,
            followup_condition,
            {entry: entry, END: END},
        )

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    # ---- helpers ----
    def _add_worker_nodes(
        self,
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    ) -> list[str]:
        if not workers:
            raise ValueError("workers must be a non-empty dict")

        names: list[str] = []
        for key, fn in workers.items():
            if isinstance(fn, tuple):
                func, name = fn
            else:
                func, name = fn, key
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Worker '{key}' must be a callable or ToolNode")
            self._graph.add_node(name, func)
            names.append(name)
        return names

    def _resolve_dispatch(self, node: Callable | tuple[Callable, str] | None) -> str | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "DISPATCH"
        if not callable(func):
            raise ValueError("dispatch node must be callable")
        self._graph.add_node(name, func)
        return name

    def _resolve_collect(
        self,
        node: Callable | tuple[Callable, str] | None,
    ) -> tuple[Callable, str] | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "COLLECT"
        if not callable(func):
            raise ValueError("collect node must be callable")
        # Do not add a single shared collect node to avoid ambiguous routing.
        # We'll create per-worker collect nodes during wiring using this (func, base_name).
        return func, name

    def _resolve_consensus(self, node: Callable | tuple[Callable, str]) -> str:
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "CONSENSUS"
        if not callable(func):
            raise ValueError("consensus node must be callable")
        self._graph.add_node(name, func)
        return name

    def _wire_edges(
        self,
        worker_sequence: list[str],
        collect_info: tuple[Callable, str] | None,
        consensus_name: str,
    ) -> None:
        for i, wname in enumerate(worker_sequence):
            is_last = i == len(worker_sequence) - 1
            target = consensus_name if is_last else worker_sequence[i + 1]
            if collect_info:
                cfunc, base = collect_info
                cname = f"{base}_{i + 1}"
                # Create a dedicated collect node per worker to prevent loops
                self._graph.add_node(cname, cfunc)
                self._graph.add_edge(wname, cname)
                self._graph.add_edge(cname, target)
            else:
                self._graph.add_edge(wname, target)

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/swarm.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(workers, consensus_node, options=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/swarm.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def compile(
    self,
    workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    consensus_node: Callable | tuple[Callable, str],
    options: dict | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    resolved_workers = self._add_worker_nodes(workers)
    worker_sequence = resolved_workers

    options = options or {}
    dispatch_node = options.get("dispatch")
    collect_node = options.get("collect")
    followup_condition = options.get("followup_condition")

    dispatch_name = self._resolve_dispatch(dispatch_node)
    collect_info = self._resolve_collect(collect_node)
    consensus_name = self._resolve_consensus(consensus_node)

    entry = dispatch_name or worker_sequence[0]
    self._graph.set_entry_point(entry)
    if dispatch_name:
        self._graph.add_edge(dispatch_name, worker_sequence[0])

    self._wire_edges(worker_sequence, collect_info, consensus_name)

    if followup_condition is None:

        def _cond(_: AgentState) -> str:
            return END

        followup_condition = _cond

    self._graph.add_conditional_edges(
        consensus_name,
        followup_condition,
        {entry: entry, END: END},
    )

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

Modules

branch_join

Classes:

Name Description
BranchJoinAgent

Execute multiple branches then join.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

BranchJoinAgent

Execute multiple branches then join.

Note: This prebuilt models branches sequentially (not true parallel execution). For each provided branch node, we add edges branch_i -> JOIN. The JOIN node decides whether more branches remain or END. A more advanced version could use BackgroundTaskManager for concurrency.

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/branch_join.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class BranchJoinAgent[StateT: AgentState]:
    """Execute multiple branches then join.

    Note: This prebuilt models branches sequentially (not true parallel execution).
    For each provided branch node, we add edges branch_i -> JOIN. The JOIN node
    decides whether more branches remain or END. A more advanced version could
    use BackgroundTaskManager for concurrency.
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        branches: dict[str, Callable | tuple[Callable, str]],
        join_node: Callable | tuple[Callable, str],
        next_branch_condition: Callable | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        if not branches:
            raise ValueError("branches must be a non-empty dict of name -> callable/tuple")

        # Add branch nodes
        branch_names = []
        for key, fn in branches.items():
            if isinstance(fn, tuple):
                branch_func, branch_name = fn
                if not callable(branch_func):
                    raise ValueError(f"Branch '{key}'[0] must be callable")
            else:
                branch_func = fn
                branch_name = key
                if not callable(branch_func):
                    raise ValueError(f"Branch '{key}' must be callable")
            self._graph.add_node(branch_name, branch_func)
            branch_names.append(branch_name)

        # Handle join_node
        if isinstance(join_node, tuple):
            join_func, join_name = join_node
            if not callable(join_func):
                raise ValueError("join_node[0] must be callable")
        else:
            join_func = join_node
            join_name = "JOIN"
            if not callable(join_func):
                raise ValueError("join_node must be callable")
        self._graph.add_node(join_name, join_func)

        # Wire branches to JOIN
        for name in branch_names:
            self._graph.add_edge(name, join_name)

        # Entry: first branch
        first = branch_names[0]
        self._graph.set_entry_point(first)

        # Decide next branch or END after join
        if next_branch_condition is None:
            # default: END after join
            def _cond(_: AgentState) -> str:
                return END

            next_branch_condition = _cond

        # next_branch_condition returns a branch name or END
        path_map = {k: k for k in branch_names}
        path_map[END] = END
        self._graph.add_conditional_edges(join_name, next_branch_condition, path_map)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/branch_join.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(branches, join_node, next_branch_condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/branch_join.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def compile(
    self,
    branches: dict[str, Callable | tuple[Callable, str]],
    join_node: Callable | tuple[Callable, str],
    next_branch_condition: Callable | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    if not branches:
        raise ValueError("branches must be a non-empty dict of name -> callable/tuple")

    # Add branch nodes
    branch_names = []
    for key, fn in branches.items():
        if isinstance(fn, tuple):
            branch_func, branch_name = fn
            if not callable(branch_func):
                raise ValueError(f"Branch '{key}'[0] must be callable")
        else:
            branch_func = fn
            branch_name = key
            if not callable(branch_func):
                raise ValueError(f"Branch '{key}' must be callable")
        self._graph.add_node(branch_name, branch_func)
        branch_names.append(branch_name)

    # Handle join_node
    if isinstance(join_node, tuple):
        join_func, join_name = join_node
        if not callable(join_func):
            raise ValueError("join_node[0] must be callable")
    else:
        join_func = join_node
        join_name = "JOIN"
        if not callable(join_func):
            raise ValueError("join_node must be callable")
    self._graph.add_node(join_name, join_func)

    # Wire branches to JOIN
    for name in branch_names:
        self._graph.add_edge(name, join_name)

    # Entry: first branch
    first = branch_names[0]
    self._graph.set_entry_point(first)

    # Decide next branch or END after join
    if next_branch_condition is None:
        # default: END after join
        def _cond(_: AgentState) -> str:
            return END

        next_branch_condition = _cond

    # next_branch_condition returns a branch name or END
    path_map = {k: k for k in branch_names}
    path_map[END] = END
    self._graph.add_conditional_edges(join_name, next_branch_condition, path_map)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

deep_research

Classes:

Name Description
DeepResearchAgent

Deep Research Agent: PLAN → RESEARCH → SYNTHESIZE → CRITIQUE loop.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

DeepResearchAgent

Deep Research Agent: PLAN → RESEARCH → SYNTHESIZE → CRITIQUE loop.

This agent mirrors modern deep-research patterns inspired by DeerFlow and Tongyi DeepResearch: plan tasks, use tools to research, synthesize findings, critique gaps and iterate a bounded number of times.

Nodes: - PLAN: Decompose problem, propose search/tool actions; may include tool calls - RESEARCH: ToolNode executes search/browse/calc/etc tools - SYNTHESIZE: Aggregate and draft a coherent report or partial answer - CRITIQUE: Identify gaps, contradictions, or follow-ups; can request more tools

Routing:
- PLAN -> conditional(_route_after_plan):
    {"RESEARCH": RESEARCH, "SYNTHESIZE": SYNTHESIZE, END: END}
  • RESEARCH -> SYNTHESIZE
  • SYNTHESIZE -> CRITIQUE
  • CRITIQUE -> conditional(_route_after_critique): {"RESEARCH": RESEARCH, END: END}

Iteration Control: - Uses execution_meta.internal_data keys: dr_max_iters (int): maximum critique→research loops (default 2) dr_iters (int): current loop count (auto-updated) dr_heavy_mode (bool): if True, bias towards one more loop when critique suggests

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/deep_research.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class DeepResearchAgent[StateT: AgentState]:
    """Deep Research Agent: PLAN → RESEARCH → SYNTHESIZE → CRITIQUE loop.

    This agent mirrors modern deep-research patterns inspired by DeerFlow and
    Tongyi DeepResearch: plan tasks, use tools to research, synthesize findings,
    critique gaps and iterate a bounded number of times.

    Nodes:
    - PLAN: Decompose problem, propose search/tool actions; may include tool calls
    - RESEARCH: ToolNode executes search/browse/calc/etc tools
    - SYNTHESIZE: Aggregate and draft a coherent report or partial answer
    - CRITIQUE: Identify gaps, contradictions, or follow-ups; can request more tools

        Routing:
        - PLAN -> conditional(_route_after_plan):
            {"RESEARCH": RESEARCH, "SYNTHESIZE": SYNTHESIZE, END: END}
    - RESEARCH -> SYNTHESIZE
    - SYNTHESIZE -> CRITIQUE
    - CRITIQUE -> conditional(_route_after_critique): {"RESEARCH": RESEARCH, END: END}

    Iteration Control:
    - Uses execution_meta.internal_data keys:
        dr_max_iters (int): maximum critique→research loops (default 2)
        dr_iters (int): current loop count (auto-updated)
        dr_heavy_mode (bool): if True, bias towards one more loop when critique suggests
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
        max_iters: int = 2,
        heavy_mode: bool = False,
    ):
        # initialize graph
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )
        # seed default internal config on prototype state
        # Note: These values will be copied to new state at invoke time.
        exec_meta: ExecutionState = self._graph._state.execution_meta
        exec_meta.internal_data.setdefault("dr_max_iters", max(0, int(max_iters)))
        exec_meta.internal_data.setdefault("dr_iters", 0)
        exec_meta.internal_data.setdefault("dr_heavy_mode", bool(heavy_mode))

    def compile(  # noqa: PLR0912
        self,
        plan_node: Callable | tuple[Callable, str],
        research_tool_node: ToolNode | tuple[ToolNode, str],
        synthesize_node: Callable | tuple[Callable, str],
        critique_node: Callable | tuple[Callable, str],
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle plan_node
        if isinstance(plan_node, tuple):
            plan_func, plan_name = plan_node
            if not callable(plan_func):
                raise ValueError("plan_node[0] must be callable")
        else:
            plan_func = plan_node
            plan_name = "PLAN"
            if not callable(plan_func):
                raise ValueError("plan_node must be callable")

        # Handle research_tool_node
        if isinstance(research_tool_node, tuple):
            research_func, research_name = research_tool_node
            if not isinstance(research_func, ToolNode):
                raise ValueError("research_tool_node[0] must be a ToolNode")
        else:
            research_func = research_tool_node
            research_name = "RESEARCH"
            if not isinstance(research_func, ToolNode):
                raise ValueError("research_tool_node must be a ToolNode")

        # Handle synthesize_node
        if isinstance(synthesize_node, tuple):
            synthesize_func, synthesize_name = synthesize_node
            if not callable(synthesize_func):
                raise ValueError("synthesize_node[0] must be callable")
        else:
            synthesize_func = synthesize_node
            synthesize_name = "SYNTHESIZE"
            if not callable(synthesize_func):
                raise ValueError("synthesize_node must be callable")

        # Handle critique_node
        if isinstance(critique_node, tuple):
            critique_func, critique_name = critique_node
            if not callable(critique_func):
                raise ValueError("critique_node[0] must be callable")
        else:
            critique_func = critique_node
            critique_name = "CRITIQUE"
            if not callable(critique_func):
                raise ValueError("critique_node must be callable")

        # Add nodes
        self._graph.add_node(plan_name, plan_func)
        self._graph.add_node(research_name, research_func)
        self._graph.add_node(synthesize_name, synthesize_func)
        self._graph.add_node(critique_name, critique_func)

        # Edges
        self._graph.add_conditional_edges(
            plan_name,
            _route_after_plan,
            {research_name: research_name, synthesize_name: synthesize_name, END: END},
        )
        self._graph.add_edge(research_name, synthesize_name)
        self._graph.add_edge(synthesize_name, critique_name)
        self._graph.add_conditional_edges(
            critique_name,
            _route_after_critique,
            {research_name: research_name, END: END},
        )

        # Entry
        self._graph.set_entry_point(plan_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None, max_iters=2, heavy_mode=False)
Source code in pyagenity/prebuilt/agent/deep_research.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
    max_iters: int = 2,
    heavy_mode: bool = False,
):
    # initialize graph
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
    # seed default internal config on prototype state
    # Note: These values will be copied to new state at invoke time.
    exec_meta: ExecutionState = self._graph._state.execution_meta
    exec_meta.internal_data.setdefault("dr_max_iters", max(0, int(max_iters)))
    exec_meta.internal_data.setdefault("dr_iters", 0)
    exec_meta.internal_data.setdefault("dr_heavy_mode", bool(heavy_mode))
compile
compile(plan_node, research_tool_node, synthesize_node, critique_node, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/deep_research.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def compile(  # noqa: PLR0912
    self,
    plan_node: Callable | tuple[Callable, str],
    research_tool_node: ToolNode | tuple[ToolNode, str],
    synthesize_node: Callable | tuple[Callable, str],
    critique_node: Callable | tuple[Callable, str],
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle plan_node
    if isinstance(plan_node, tuple):
        plan_func, plan_name = plan_node
        if not callable(plan_func):
            raise ValueError("plan_node[0] must be callable")
    else:
        plan_func = plan_node
        plan_name = "PLAN"
        if not callable(plan_func):
            raise ValueError("plan_node must be callable")

    # Handle research_tool_node
    if isinstance(research_tool_node, tuple):
        research_func, research_name = research_tool_node
        if not isinstance(research_func, ToolNode):
            raise ValueError("research_tool_node[0] must be a ToolNode")
    else:
        research_func = research_tool_node
        research_name = "RESEARCH"
        if not isinstance(research_func, ToolNode):
            raise ValueError("research_tool_node must be a ToolNode")

    # Handle synthesize_node
    if isinstance(synthesize_node, tuple):
        synthesize_func, synthesize_name = synthesize_node
        if not callable(synthesize_func):
            raise ValueError("synthesize_node[0] must be callable")
    else:
        synthesize_func = synthesize_node
        synthesize_name = "SYNTHESIZE"
        if not callable(synthesize_func):
            raise ValueError("synthesize_node must be callable")

    # Handle critique_node
    if isinstance(critique_node, tuple):
        critique_func, critique_name = critique_node
        if not callable(critique_func):
            raise ValueError("critique_node[0] must be callable")
    else:
        critique_func = critique_node
        critique_name = "CRITIQUE"
        if not callable(critique_func):
            raise ValueError("critique_node must be callable")

    # Add nodes
    self._graph.add_node(plan_name, plan_func)
    self._graph.add_node(research_name, research_func)
    self._graph.add_node(synthesize_name, synthesize_func)
    self._graph.add_node(critique_name, critique_func)

    # Edges
    self._graph.add_conditional_edges(
        plan_name,
        _route_after_plan,
        {research_name: research_name, synthesize_name: synthesize_name, END: END},
    )
    self._graph.add_edge(research_name, synthesize_name)
    self._graph.add_edge(synthesize_name, critique_name)
    self._graph.add_conditional_edges(
        critique_name,
        _route_after_critique,
        {research_name: research_name, END: END},
    )

    # Entry
    self._graph.set_entry_point(plan_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

guarded

Classes:

Name Description
GuardedAgent

Validate output and repair until valid or attempts exhausted.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

GuardedAgent

Validate output and repair until valid or attempts exhausted.

Nodes: - PRODUCE: main generation node - REPAIR: correction node when validation fails

Edges: PRODUCE -> conditional(valid? END : REPAIR) REPAIR -> PRODUCE

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/guarded.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class GuardedAgent[StateT: AgentState]:
    """Validate output and repair until valid or attempts exhausted.

    Nodes:
    - PRODUCE: main generation node
    - REPAIR: correction node when validation fails

    Edges:
    PRODUCE -> conditional(valid? END : REPAIR)
    REPAIR -> PRODUCE
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        produce_node: Callable | tuple[Callable, str],
        repair_node: Callable | tuple[Callable, str],
        validator: Callable[[AgentState], bool],
        max_attempts: int = 2,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle produce_node
        if isinstance(produce_node, tuple):
            produce_func, produce_name = produce_node
            if not callable(produce_func):
                raise ValueError("produce_node[0] must be callable")
        else:
            produce_func = produce_node
            produce_name = "PRODUCE"
            if not callable(produce_func):
                raise ValueError("produce_node must be callable")

        # Handle repair_node
        if isinstance(repair_node, tuple):
            repair_func, repair_name = repair_node
            if not callable(repair_func):
                raise ValueError("repair_node[0] must be callable")
        else:
            repair_func = repair_node
            repair_name = "REPAIR"
            if not callable(repair_func):
                raise ValueError("repair_node must be callable")

        self._graph.add_node(produce_name, produce_func)
        self._graph.add_node(repair_name, repair_func)

        # produce -> END or REPAIR
        condition = _guard_condition_factory(validator, max_attempts)
        self._graph.add_conditional_edges(
            produce_name,
            condition,
            {repair_name: repair_name, END: END},
        )
        # repair -> produce
        self._graph.add_edge(repair_name, produce_name)

        self._graph.set_entry_point(produce_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/guarded.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(produce_node, repair_node, validator, max_attempts=2, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/guarded.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def compile(
    self,
    produce_node: Callable | tuple[Callable, str],
    repair_node: Callable | tuple[Callable, str],
    validator: Callable[[AgentState], bool],
    max_attempts: int = 2,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle produce_node
    if isinstance(produce_node, tuple):
        produce_func, produce_name = produce_node
        if not callable(produce_func):
            raise ValueError("produce_node[0] must be callable")
    else:
        produce_func = produce_node
        produce_name = "PRODUCE"
        if not callable(produce_func):
            raise ValueError("produce_node must be callable")

    # Handle repair_node
    if isinstance(repair_node, tuple):
        repair_func, repair_name = repair_node
        if not callable(repair_func):
            raise ValueError("repair_node[0] must be callable")
    else:
        repair_func = repair_node
        repair_name = "REPAIR"
        if not callable(repair_func):
            raise ValueError("repair_node must be callable")

    self._graph.add_node(produce_name, produce_func)
    self._graph.add_node(repair_name, repair_func)

    # produce -> END or REPAIR
    condition = _guard_condition_factory(validator, max_attempts)
    self._graph.add_conditional_edges(
        produce_name,
        condition,
        {repair_name: repair_name, END: END},
    )
    # repair -> produce
    self._graph.add_edge(repair_name, produce_name)

    self._graph.set_entry_point(produce_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

map_reduce

Classes:

Name Description
MapReduceAgent

Map over items then reduce.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

MapReduceAgent

Map over items then reduce.

Nodes: - SPLIT: optional, prepares per-item tasks (or state already contains items) - MAP: processes one item per iteration - REDUCE: aggregates results and decides END or continue

Compile requires

map_node: Callable|ToolNode reduce_node: Callable split_node: Callable | None condition: Callable[[AgentState], str] returns "MAP" to continue or END

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/map_reduce.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class MapReduceAgent[StateT: AgentState]:
    """Map over items then reduce.

    Nodes:
    - SPLIT: optional, prepares per-item tasks (or state already contains items)
    - MAP: processes one item per iteration
    - REDUCE: aggregates results and decides END or continue

    Compile requires:
      map_node: Callable|ToolNode
      reduce_node: Callable
      split_node: Callable | None
      condition: Callable[[AgentState], str] returns "MAP" to continue or END
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(  # noqa: PLR0912
        self,
        map_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
        reduce_node: Callable | tuple[Callable, str],
        split_node: Callable | tuple[Callable, str] | None = None,
        condition: Callable[[AgentState], str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle split_node
        split_name = "SPLIT"
        if split_node:
            if isinstance(split_node, tuple):
                split_func, split_name = split_node
                if not callable(split_func):
                    raise ValueError("split_node[0] must be callable")
            else:
                split_func = split_node
                split_name = "SPLIT"
                if not callable(split_func):
                    raise ValueError("split_node must be callable")
            self._graph.add_node(split_name, split_func)

        # Handle map_node
        if isinstance(map_node, tuple):
            map_func, map_name = map_node
            if not (callable(map_func) or isinstance(map_func, ToolNode)):
                raise ValueError("map_node[0] must be callable or ToolNode")
        else:
            map_func = map_node
            map_name = "MAP"
            if not (callable(map_func) or isinstance(map_func, ToolNode)):
                raise ValueError("map_node must be callable or ToolNode")
        self._graph.add_node(map_name, map_func)

        # Handle reduce_node
        if isinstance(reduce_node, tuple):
            reduce_func, reduce_name = reduce_node
            if not callable(reduce_func):
                raise ValueError("reduce_node[0] must be callable")
        else:
            reduce_func = reduce_node
            reduce_name = "REDUCE"
            if not callable(reduce_func):
                raise ValueError("reduce_node must be callable")
        self._graph.add_node(reduce_name, reduce_func)

        # Edges
        if split_node:
            self._graph.add_edge(split_name, map_name)
            self._graph.set_entry_point(split_name)
        else:
            self._graph.set_entry_point(map_name)

        self._graph.add_edge(map_name, reduce_name)

        # Continue mapping or finish
        if condition is None:
            # default: finish after one map-reduce
            def _cond(_: AgentState) -> str:
                return END

            condition = _cond

        self._graph.add_conditional_edges(reduce_name, condition, {map_name: map_name, END: END})

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/map_reduce.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(map_node, reduce_node, split_node=None, condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/map_reduce.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def compile(  # noqa: PLR0912
    self,
    map_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
    reduce_node: Callable | tuple[Callable, str],
    split_node: Callable | tuple[Callable, str] | None = None,
    condition: Callable[[AgentState], str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle split_node
    split_name = "SPLIT"
    if split_node:
        if isinstance(split_node, tuple):
            split_func, split_name = split_node
            if not callable(split_func):
                raise ValueError("split_node[0] must be callable")
        else:
            split_func = split_node
            split_name = "SPLIT"
            if not callable(split_func):
                raise ValueError("split_node must be callable")
        self._graph.add_node(split_name, split_func)

    # Handle map_node
    if isinstance(map_node, tuple):
        map_func, map_name = map_node
        if not (callable(map_func) or isinstance(map_func, ToolNode)):
            raise ValueError("map_node[0] must be callable or ToolNode")
    else:
        map_func = map_node
        map_name = "MAP"
        if not (callable(map_func) or isinstance(map_func, ToolNode)):
            raise ValueError("map_node must be callable or ToolNode")
    self._graph.add_node(map_name, map_func)

    # Handle reduce_node
    if isinstance(reduce_node, tuple):
        reduce_func, reduce_name = reduce_node
        if not callable(reduce_func):
            raise ValueError("reduce_node[0] must be callable")
    else:
        reduce_func = reduce_node
        reduce_name = "REDUCE"
        if not callable(reduce_func):
            raise ValueError("reduce_node must be callable")
    self._graph.add_node(reduce_name, reduce_func)

    # Edges
    if split_node:
        self._graph.add_edge(split_name, map_name)
        self._graph.set_entry_point(split_name)
    else:
        self._graph.set_entry_point(map_name)

    self._graph.add_edge(map_name, reduce_name)

    # Continue mapping or finish
    if condition is None:
        # default: finish after one map-reduce
        def _cond(_: AgentState) -> str:
            return END

        condition = _cond

    self._graph.add_conditional_edges(reduce_name, condition, {map_name: map_name, END: END})

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

network

Classes:

Name Description
NetworkAgent

Network pattern: define arbitrary node set and routing policies.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

NetworkAgent

Network pattern: define arbitrary node set and routing policies.

  • Nodes can be callables or ToolNode.
  • Edges can be static or conditional via a router function per node.
  • Entry point is explicit.

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/network.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class NetworkAgent[StateT: AgentState]:
    """Network pattern: define arbitrary node set and routing policies.

    - Nodes can be callables or ToolNode.
    - Edges can be static or conditional via a router function per node.
    - Entry point is explicit.
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        nodes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        entry: str,
        static_edges: list[tuple[str, str]] | None = None,
        conditional_edges: list[tuple[str, Callable[[AgentState], str], dict[str, str]]]
        | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        if not nodes:
            raise ValueError("nodes must be a non-empty dict")

        # Add nodes
        for key, fn in nodes.items():
            if isinstance(fn, tuple):
                func, name = fn
            else:
                func, name = fn, key
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Node '{key}' must be a callable or ToolNode")
            self._graph.add_node(name, func)

        if entry not in self._graph.nodes:
            raise ValueError(f"entry node '{entry}' must be present in nodes")

        # Static edges
        for src, dst in static_edges or []:
            if src not in self._graph.nodes or dst not in self._graph.nodes:
                raise ValueError(f"Invalid static edge {src}->{dst}: unknown node")
            self._graph.add_edge(src, dst)

        # Conditional edges
        for src, cond, pmap in conditional_edges or []:
            if src not in self._graph.nodes:
                raise ValueError(f"Invalid conditional edge: unknown node '{src}'")
            self._graph.add_conditional_edges(src, cond, pmap)

        # Note: callers may include END in path maps; not enforced here.

        self._graph.set_entry_point(entry)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/network.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(nodes, entry, static_edges=None, conditional_edges=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/network.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def compile(
    self,
    nodes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    entry: str,
    static_edges: list[tuple[str, str]] | None = None,
    conditional_edges: list[tuple[str, Callable[[AgentState], str], dict[str, str]]]
    | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    if not nodes:
        raise ValueError("nodes must be a non-empty dict")

    # Add nodes
    for key, fn in nodes.items():
        if isinstance(fn, tuple):
            func, name = fn
        else:
            func, name = fn, key
        if not (callable(func) or isinstance(func, ToolNode)):
            raise ValueError(f"Node '{key}' must be a callable or ToolNode")
        self._graph.add_node(name, func)

    if entry not in self._graph.nodes:
        raise ValueError(f"entry node '{entry}' must be present in nodes")

    # Static edges
    for src, dst in static_edges or []:
        if src not in self._graph.nodes or dst not in self._graph.nodes:
            raise ValueError(f"Invalid static edge {src}->{dst}: unknown node")
        self._graph.add_edge(src, dst)

    # Conditional edges
    for src, cond, pmap in conditional_edges or []:
        if src not in self._graph.nodes:
            raise ValueError(f"Invalid conditional edge: unknown node '{src}'")
        self._graph.add_conditional_edges(src, cond, pmap)

    # Note: callers may include END in path maps; not enforced here.

    self._graph.set_entry_point(entry)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

plan_act_reflect

Classes:

Name Description
PlanActReflectAgent

Plan -> Act -> Reflect looping agent.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

PlanActReflectAgent

Plan -> Act -> Reflect looping agent.

Pattern

PLAN -> (condition) -> ACT | REFLECT | END ACT -> REFLECT REFLECT -> PLAN

Default condition (_should_act): - If last assistant message contains tool calls -> ACT - If last message is from a tool -> REFLECT - Else -> END

Provide a custom condition to override this heuristic and implement
  • Budget / depth limiting
  • Confidence-based early stop
  • Dynamic branch selection (e.g., different tool nodes)

Parameters (constructor): state: Optional initial state instance context_manager: Custom context manager publisher: Optional publisher for streaming / events id_generator: ID generation strategy container: InjectQ DI container

compile(...) arguments: plan_node: Callable (state -> state). Produces next thought / tool requests tool_node: ToolNode executing declared tools reflect_node: Callable (state -> state). Consumes tool results & may adjust plan condition: Optional Callable[[AgentState], str] returning next node name or END checkpointer/store/interrupt_before/interrupt_after/callback_manager: Standard graph compilation options

Returns:

Type Description

CompiledGraph ready for invoke / ainvoke.

Notes
  • Node names can be customized via (callable, "NAME") tuples.
  • condition must return one of: tool_node_name, reflect_node_name, END.

Methods:

Name Description
__init__
compile

Compile the Plan-Act-Reflect loop.

Source code in pyagenity/prebuilt/agent/plan_act_reflect.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class PlanActReflectAgent[StateT: AgentState]:
    """Plan -> Act -> Reflect looping agent.

    Pattern:
        PLAN -> (condition) -> ACT | REFLECT | END
        ACT -> REFLECT
        REFLECT -> PLAN

    Default condition (_should_act):
        - If last assistant message contains tool calls -> ACT
        - If last message is from a tool -> REFLECT
        - Else -> END

    Provide a custom condition to override this heuristic and implement:
        * Budget / depth limiting
        * Confidence-based early stop
        * Dynamic branch selection (e.g., different tool nodes)

    Parameters (constructor):
        state: Optional initial state instance
        context_manager: Custom context manager
        publisher: Optional publisher for streaming / events
        id_generator: ID generation strategy
        container: InjectQ DI container

    compile(...) arguments:
        plan_node: Callable (state -> state). Produces next thought / tool requests
        tool_node: ToolNode executing declared tools
        reflect_node: Callable (state -> state). Consumes tool results & may adjust plan
        condition: Optional Callable[[AgentState], str] returning next node name or END
        checkpointer/store/interrupt_before/interrupt_after/callback_manager:
            Standard graph compilation options

    Returns:
        CompiledGraph ready for invoke / ainvoke.

    Notes:
        - Node names can be customized via (callable, "NAME") tuples.
        - condition must return one of: tool_node_name, reflect_node_name, END.
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        plan_node: Callable | tuple[Callable, str],
        tool_node: ToolNode | tuple[ToolNode, str],
        reflect_node: Callable | tuple[Callable, str],
        *,
        condition: Callable[[AgentState], str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        """Compile the Plan-Act-Reflect loop.

        Args:
            plan_node: Callable or (callable, name)
            tool_node: ToolNode or (ToolNode, name)
            reflect_node: Callable or (callable, name)
            condition: Optional decision function. Defaults to internal heuristic.
            checkpointer/store/interrupt_* / callback_manager: Standard graph options.

        Returns:
            CompiledGraph
        """
        # PLAN
        if isinstance(plan_node, tuple):
            plan_func, plan_name = plan_node
            if not callable(plan_func):
                raise ValueError("plan_node[0] must be callable")
        else:
            plan_func = plan_node
            plan_name = "PLAN"
            if not callable(plan_func):
                raise ValueError("plan_node must be callable")

        # ACT
        if isinstance(tool_node, tuple):
            tool_func, tool_name = tool_node
            if not isinstance(tool_func, ToolNode):
                raise ValueError("tool_node[0] must be a ToolNode")
        else:
            tool_func = tool_node
            tool_name = "ACT"
            if not isinstance(tool_func, ToolNode):
                raise ValueError("tool_node must be a ToolNode")

        # REFLECT
        if isinstance(reflect_node, tuple):
            reflect_func, reflect_name = reflect_node
            if not callable(reflect_func):
                raise ValueError("reflect_node[0] must be callable")
        else:
            reflect_func = reflect_node
            reflect_name = "REFLECT"
            if not callable(reflect_func):
                raise ValueError("reflect_node must be callable")

        # Register nodes
        self._graph.add_node(plan_name, plan_func)
        self._graph.add_node(tool_name, tool_func)
        self._graph.add_node(reflect_name, reflect_func)

        # Decision
        decision_fn = condition or _should_act
        self._graph.add_conditional_edges(
            plan_name,
            decision_fn,
            {tool_name: tool_name, reflect_name: reflect_name, END: END},
        )

        # Loop edges
        self._graph.add_edge(tool_name, reflect_name)
        self._graph.add_edge(reflect_name, plan_name)

        # Entry
        self._graph.set_entry_point(plan_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/plan_act_reflect.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(plan_node, tool_node, reflect_node, *, condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())

Compile the Plan-Act-Reflect loop.

Parameters:

Name Type Description Default
plan_node Callable | tuple[Callable, str]

Callable or (callable, name)

required
tool_node ToolNode | tuple[ToolNode, str]

ToolNode or (ToolNode, name)

required
reflect_node Callable | tuple[Callable, str]

Callable or (callable, name)

required
condition Callable[[AgentState], str] | None

Optional decision function. Defaults to internal heuristic.

None
checkpointer/store/interrupt_* / callback_manager

Standard graph options.

required

Returns:

Type Description
CompiledGraph

CompiledGraph

Source code in pyagenity/prebuilt/agent/plan_act_reflect.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def compile(
    self,
    plan_node: Callable | tuple[Callable, str],
    tool_node: ToolNode | tuple[ToolNode, str],
    reflect_node: Callable | tuple[Callable, str],
    *,
    condition: Callable[[AgentState], str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    """Compile the Plan-Act-Reflect loop.

    Args:
        plan_node: Callable or (callable, name)
        tool_node: ToolNode or (ToolNode, name)
        reflect_node: Callable or (callable, name)
        condition: Optional decision function. Defaults to internal heuristic.
        checkpointer/store/interrupt_* / callback_manager: Standard graph options.

    Returns:
        CompiledGraph
    """
    # PLAN
    if isinstance(plan_node, tuple):
        plan_func, plan_name = plan_node
        if not callable(plan_func):
            raise ValueError("plan_node[0] must be callable")
    else:
        plan_func = plan_node
        plan_name = "PLAN"
        if not callable(plan_func):
            raise ValueError("plan_node must be callable")

    # ACT
    if isinstance(tool_node, tuple):
        tool_func, tool_name = tool_node
        if not isinstance(tool_func, ToolNode):
            raise ValueError("tool_node[0] must be a ToolNode")
    else:
        tool_func = tool_node
        tool_name = "ACT"
        if not isinstance(tool_func, ToolNode):
            raise ValueError("tool_node must be a ToolNode")

    # REFLECT
    if isinstance(reflect_node, tuple):
        reflect_func, reflect_name = reflect_node
        if not callable(reflect_func):
            raise ValueError("reflect_node[0] must be callable")
    else:
        reflect_func = reflect_node
        reflect_name = "REFLECT"
        if not callable(reflect_func):
            raise ValueError("reflect_node must be callable")

    # Register nodes
    self._graph.add_node(plan_name, plan_func)
    self._graph.add_node(tool_name, tool_func)
    self._graph.add_node(reflect_name, reflect_func)

    # Decision
    decision_fn = condition or _should_act
    self._graph.add_conditional_edges(
        plan_name,
        decision_fn,
        {tool_name: tool_name, reflect_name: reflect_name, END: END},
    )

    # Loop edges
    self._graph.add_edge(tool_name, reflect_name)
    self._graph.add_edge(reflect_name, plan_name)

    # Entry
    self._graph.set_entry_point(plan_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

rag

Classes:

Name Description
RAGAgent

Simple RAG: retrieve -> synthesize; optional follow-up.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

RAGAgent

Simple RAG: retrieve -> synthesize; optional follow-up.

Nodes: - RETRIEVE: uses a retriever (callable or ToolNode) to fetch context - SYNTHESIZE: LLM/composer builds an answer - Optional condition: loop back to RETRIEVE for follow-up queries; else END

Methods:

Name Description
__init__
compile
compile_advanced

Advanced RAG wiring with hybrid retrieval and optional stages.

Source code in pyagenity/prebuilt/agent/rag.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class RAGAgent[StateT: AgentState]:
    """Simple RAG: retrieve -> synthesize; optional follow-up.

    Nodes:
    - RETRIEVE: uses a retriever (callable or ToolNode) to fetch context
    - SYNTHESIZE: LLM/composer builds an answer
    - Optional condition: loop back to RETRIEVE for follow-up queries; else END
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        retriever_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
        synthesize_node: Callable | tuple[Callable, str],
        followup_condition: Callable[[AgentState], str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Nodes
        # Handle retriever_node
        if isinstance(retriever_node, tuple):
            retriever_func, retriever_name = retriever_node
            if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
                raise ValueError("retriever_node[0] must be callable or ToolNode")
        else:
            retriever_func = retriever_node
            retriever_name = "RETRIEVE"
            if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
                raise ValueError("retriever_node must be callable or ToolNode")

        # Handle synthesize_node
        if isinstance(synthesize_node, tuple):
            synthesize_func, synthesize_name = synthesize_node
            if not callable(synthesize_func):
                raise ValueError("synthesize_node[0] must be callable")
        else:
            synthesize_func = synthesize_node
            synthesize_name = "SYNTHESIZE"
            if not callable(synthesize_func):
                raise ValueError("synthesize_node must be callable")

        self._graph.add_node(retriever_name, retriever_func)  # type: ignore[arg-type]
        self._graph.add_node(synthesize_name, synthesize_func)

        # Edges
        self._graph.add_edge(retriever_name, synthesize_name)
        self._graph.set_entry_point(retriever_name)

        if followup_condition is None:
            # default: END after synthesize
            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        self._graph.add_conditional_edges(
            synthesize_name,
            followup_condition,
            {retriever_name: retriever_name, END: END},
        )

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    def compile_advanced(
        self,
        retriever_nodes: list[Callable | ToolNode | tuple[Callable | ToolNode, str]],
        synthesize_node: Callable | tuple[Callable, str],
        options: dict | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        """Advanced RAG wiring with hybrid retrieval and optional stages.

        Chain:
          (QUERY_PLAN?) -> R1 -> (MERGE?) -> R2 -> (MERGE?) -> ...
          -> (RERANK?) -> (COMPRESS?) -> SYNTHESIZE -> cond
        Each retriever may be a different modality (sparse, dense, self-query, MMR, etc.).
        """

        options = options or {}
        query_plan_node = options.get("query_plan")
        merger_node = options.get("merge")
        rerank_node = options.get("rerank")
        compress_node = options.get("compress")
        followup_condition = options.get("followup_condition")

        qname = self._add_optional_node(
            query_plan_node,
            default_name="QUERY_PLAN",
            label="query_plan",
        )

        # Add retrievers
        r_names = self._add_retriever_nodes(retriever_nodes)

        # Optional stages
        mname = self._add_optional_node(merger_node, default_name="MERGE", label="merge")
        rrname = self._add_optional_node(rerank_node, default_name="RERANK", label="rerank")
        cname = self._add_optional_node(
            compress_node,
            default_name="COMPRESS",
            label="compress",
        )

        # Synthesize
        sname = self._add_synthesize_node(synthesize_node)

        # Wire edges end-to-end and follow-up
        self._wire_advanced_edges(qname, r_names, mname, rrname, cname, sname, followup_condition)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    # ---- helpers ----
    def _add_optional_node(
        self,
        node: Callable | tuple[Callable, str] | None,
        *,
        default_name: str,
        label: str,
    ) -> str | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, default_name
        if not callable(func):
            raise ValueError(f"{label} node must be callable")
        self._graph.add_node(name, func)
        return name

    def _add_retriever_nodes(
        self,
        retriever_nodes: list[Callable | ToolNode | tuple[Callable | ToolNode, str]],
    ) -> list[str]:
        if not retriever_nodes:
            raise ValueError("retriever_nodes must be non-empty")
        names: list[str] = []
        for idx, rn in enumerate(retriever_nodes):
            if isinstance(rn, tuple):
                rfunc, rname = rn
            else:
                rfunc, rname = rn, f"RETRIEVE_{idx + 1}"
            if not (callable(rfunc) or isinstance(rfunc, ToolNode)):
                raise ValueError("retriever must be callable or ToolNode")
            self._graph.add_node(rname, rfunc)  # type: ignore[arg-type]
            names.append(rname)
        return names

    def _add_synthesize_node(self, synthesize_node: Callable | tuple[Callable, str]) -> str:
        if isinstance(synthesize_node, tuple):
            sfunc, sname = synthesize_node
        else:
            sfunc, sname = synthesize_node, "SYNTHESIZE"
        if not callable(sfunc):
            raise ValueError("synthesize_node must be callable")
        self._graph.add_node(sname, sfunc)
        return sname

    def _wire_advanced_edges(
        self,
        qname: str | None,
        r_names: list[str],
        mname: str | None,
        rrname: str | None,
        cname: str | None,
        sname: str,
        followup_condition: Callable[[AgentState], str] | None = None,
    ) -> None:
        entry = qname or r_names[0]
        self._graph.set_entry_point(entry)
        if qname:
            self._graph.add_edge(qname, r_names[0])

        tail_target = rrname or cname or sname
        for i, rname in enumerate(r_names):
            is_last = i == len(r_names) - 1
            nxt = r_names[i + 1] if not is_last else tail_target
            if mname:
                self._graph.add_edge(rname, mname)
                self._graph.add_edge(mname, nxt)
            else:
                self._graph.add_edge(rname, nxt)

        # Tail wiring
        if rrname and cname:
            self._graph.add_edge(rrname, cname)
            self._graph.add_edge(cname, sname)
        elif rrname:
            self._graph.add_edge(rrname, sname)
        elif cname:
            self._graph.add_edge(cname, sname)

        # default follow-up to END
        if followup_condition is None:

            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        entry_node = qname or r_names[0]
        path_map = {entry_node: entry_node, END: END}
        self._graph.add_conditional_edges(sname, followup_condition, path_map)
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/rag.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(retriever_node, synthesize_node, followup_condition=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/rag.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def compile(
    self,
    retriever_node: Callable | ToolNode | tuple[Callable | ToolNode, str],
    synthesize_node: Callable | tuple[Callable, str],
    followup_condition: Callable[[AgentState], str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Nodes
    # Handle retriever_node
    if isinstance(retriever_node, tuple):
        retriever_func, retriever_name = retriever_node
        if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
            raise ValueError("retriever_node[0] must be callable or ToolNode")
    else:
        retriever_func = retriever_node
        retriever_name = "RETRIEVE"
        if not (callable(retriever_func) or isinstance(retriever_func, ToolNode)):
            raise ValueError("retriever_node must be callable or ToolNode")

    # Handle synthesize_node
    if isinstance(synthesize_node, tuple):
        synthesize_func, synthesize_name = synthesize_node
        if not callable(synthesize_func):
            raise ValueError("synthesize_node[0] must be callable")
    else:
        synthesize_func = synthesize_node
        synthesize_name = "SYNTHESIZE"
        if not callable(synthesize_func):
            raise ValueError("synthesize_node must be callable")

    self._graph.add_node(retriever_name, retriever_func)  # type: ignore[arg-type]
    self._graph.add_node(synthesize_name, synthesize_func)

    # Edges
    self._graph.add_edge(retriever_name, synthesize_name)
    self._graph.set_entry_point(retriever_name)

    if followup_condition is None:
        # default: END after synthesize
        def _cond(_: AgentState) -> str:
            return END

        followup_condition = _cond

    self._graph.add_conditional_edges(
        synthesize_name,
        followup_condition,
        {retriever_name: retriever_name, END: END},
    )

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )
compile_advanced
compile_advanced(retriever_nodes, synthesize_node, options=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())

Advanced RAG wiring with hybrid retrieval and optional stages.

Chain

(QUERY_PLAN?) -> R1 -> (MERGE?) -> R2 -> (MERGE?) -> ... -> (RERANK?) -> (COMPRESS?) -> SYNTHESIZE -> cond

Each retriever may be a different modality (sparse, dense, self-query, MMR, etc.).

Source code in pyagenity/prebuilt/agent/rag.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def compile_advanced(
    self,
    retriever_nodes: list[Callable | ToolNode | tuple[Callable | ToolNode, str]],
    synthesize_node: Callable | tuple[Callable, str],
    options: dict | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    """Advanced RAG wiring with hybrid retrieval and optional stages.

    Chain:
      (QUERY_PLAN?) -> R1 -> (MERGE?) -> R2 -> (MERGE?) -> ...
      -> (RERANK?) -> (COMPRESS?) -> SYNTHESIZE -> cond
    Each retriever may be a different modality (sparse, dense, self-query, MMR, etc.).
    """

    options = options or {}
    query_plan_node = options.get("query_plan")
    merger_node = options.get("merge")
    rerank_node = options.get("rerank")
    compress_node = options.get("compress")
    followup_condition = options.get("followup_condition")

    qname = self._add_optional_node(
        query_plan_node,
        default_name="QUERY_PLAN",
        label="query_plan",
    )

    # Add retrievers
    r_names = self._add_retriever_nodes(retriever_nodes)

    # Optional stages
    mname = self._add_optional_node(merger_node, default_name="MERGE", label="merge")
    rrname = self._add_optional_node(rerank_node, default_name="RERANK", label="rerank")
    cname = self._add_optional_node(
        compress_node,
        default_name="COMPRESS",
        label="compress",
    )

    # Synthesize
    sname = self._add_synthesize_node(synthesize_node)

    # Wire edges end-to-end and follow-up
    self._wire_advanced_edges(qname, r_names, mname, rrname, cname, sname, followup_condition)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

react

Classes:

Name Description
ReactAgent

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

ReactAgent

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/react.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class ReactAgent[StateT: AgentState]:
    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        main_node: tuple[Callable, str] | Callable,
        tool_node: tuple[Callable, str] | Callable,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Determine main node function and name
        if isinstance(main_node, tuple):
            main_func, main_name = main_node
            if not callable(main_func):
                raise ValueError("main_node[0] must be a callable function")
        else:
            main_func = main_node
            main_name = "MAIN"
            if not callable(main_func):
                raise ValueError("main_node must be a callable function")

        # Determine tool node function and name
        if isinstance(tool_node, tuple):
            tool_func, tool_name = tool_node
            # Accept both callable functions and ToolNode instances
            if not callable(tool_func) and not hasattr(tool_func, "invoke"):
                raise ValueError("tool_node[0] must be a callable function or ToolNode")
        else:
            tool_func = tool_node
            tool_name = "TOOL"
            # Accept both callable functions and ToolNode instances
            # ToolNode instances have an 'invoke' method but are not callable
            if not callable(tool_func) and not hasattr(tool_func, "invoke"):
                raise ValueError("tool_node must be a callable function or ToolNode instance")

        self._graph.add_node(main_name, main_func)
        self._graph.add_node(tool_name, tool_func)

        # Now create edges
        self._graph.add_conditional_edges(
            main_name,
            _should_use_tools,
            {tool_name: tool_name, END: END},
        )

        self._graph.add_edge(tool_name, main_name)
        self._graph.set_entry_point(main_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/react.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(main_node, tool_node, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/react.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def compile(
    self,
    main_node: tuple[Callable, str] | Callable,
    tool_node: tuple[Callable, str] | Callable,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Determine main node function and name
    if isinstance(main_node, tuple):
        main_func, main_name = main_node
        if not callable(main_func):
            raise ValueError("main_node[0] must be a callable function")
    else:
        main_func = main_node
        main_name = "MAIN"
        if not callable(main_func):
            raise ValueError("main_node must be a callable function")

    # Determine tool node function and name
    if isinstance(tool_node, tuple):
        tool_func, tool_name = tool_node
        # Accept both callable functions and ToolNode instances
        if not callable(tool_func) and not hasattr(tool_func, "invoke"):
            raise ValueError("tool_node[0] must be a callable function or ToolNode")
    else:
        tool_func = tool_node
        tool_name = "TOOL"
        # Accept both callable functions and ToolNode instances
        # ToolNode instances have an 'invoke' method but are not callable
        if not callable(tool_func) and not hasattr(tool_func, "invoke"):
            raise ValueError("tool_node must be a callable function or ToolNode instance")

    self._graph.add_node(main_name, main_func)
    self._graph.add_node(tool_name, tool_func)

    # Now create edges
    self._graph.add_conditional_edges(
        main_name,
        _should_use_tools,
        {tool_name: tool_name, END: END},
    )

    self._graph.add_edge(tool_name, main_name)
    self._graph.set_entry_point(main_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

router

Classes:

Name Description
RouterAgent

A configurable router-style agent.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

RouterAgent

A configurable router-style agent.

Pattern: - A router node runs (LLM or custom logic) and may update state/messages - A condition function inspects the state and returns a route key - Edges route to the matching node; each route returns back to ROUTER - Return END (via condition) to finish

Usage

router = RouterAgent() app = router.compile( router_node=my_router_func, # def my_router_func(state, config, ...) routes={ "search": search_node, "summarize": summarize_node, }, # Condition inspects state and returns one of the keys above or END condition=my_condition, # def my_condition(state) -> str # Optional explicit path map if returned keys differ from node names # path_map={"SEARCH": "search", "SUM": "summarize", END: END} )

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/router.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class RouterAgent[StateT: AgentState]:
    """A configurable router-style agent.

    Pattern:
    - A router node runs (LLM or custom logic) and may update state/messages
    - A condition function inspects the state and returns a route key
    - Edges route to the matching node; each route returns back to ROUTER
    - Return END (via condition) to finish

    Usage:
        router = RouterAgent()
        app = router.compile(
            router_node=my_router_func,  # def my_router_func(state, config, ...)
            routes={
                "search": search_node,
                "summarize": summarize_node,
            },
            # Condition inspects state and returns one of the keys above or END
            condition=my_condition,  # def my_condition(state) -> str
            # Optional explicit path map if returned keys differ from node names
            # path_map={"SEARCH": "search", "SUM": "summarize", END: END}
        )
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(  # noqa: PLR0912
        self,
        router_node: Callable | tuple[Callable, str],
        routes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        condition: Callable[[AgentState], str] | None = None,
        path_map: dict[str, str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle router_node
        if isinstance(router_node, tuple):
            router_func, router_name = router_node
            if not callable(router_func):
                raise ValueError("router_node[0] must be callable")
        else:
            router_func = router_node
            router_name = "ROUTER"
            if not callable(router_func):
                raise ValueError("router_node must be callable")

        if not routes:
            raise ValueError("routes must be a non-empty dict of name -> callable/ToolNode/tuple")

        # Add route nodes
        route_names = []
        for key, func in routes.items():
            if isinstance(func, tuple):
                route_func, route_name = func
                if not (callable(route_func) or isinstance(route_func, ToolNode)):
                    raise ValueError(f"Route '{key}'[0] must be callable or ToolNode")
            else:
                route_func = func
                route_name = key
                if not (callable(route_func) or isinstance(route_func, ToolNode)):
                    raise ValueError(f"Route '{key}' must be callable or ToolNode")
            self._graph.add_node(route_name, route_func)
            route_names.append(route_name)

        # Add router node as entry
        self._graph.add_node(router_name, router_func)

        # Build default condition/path_map if needed
        if condition is None and len(route_names) == 1:
            only = route_names[0]

            def _always(_: AgentState) -> str:
                return only

            condition = _always
            path_map = {only: only, END: END}

        if condition is None and len(route_names) > 1:
            raise ValueError("condition must be provided when multiple routes are defined")

        # If path_map is not provided, assume router returns exact route names
        if path_map is None:
            path_map = {k: k for k in route_names}
            path_map[END] = END

        # Conditional edges from router node based on condition results
        self._graph.add_conditional_edges(
            router_name,
            condition,  # type: ignore[arg-type]
            path_map,
        )

        # Loop back to router node from each route node
        for name in route_names:
            self._graph.add_edge(name, router_name)

        # Entry
        self._graph.set_entry_point(router_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/router.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(router_node, routes, condition=None, path_map=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/router.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def compile(  # noqa: PLR0912
    self,
    router_node: Callable | tuple[Callable, str],
    routes: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    condition: Callable[[AgentState], str] | None = None,
    path_map: dict[str, str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle router_node
    if isinstance(router_node, tuple):
        router_func, router_name = router_node
        if not callable(router_func):
            raise ValueError("router_node[0] must be callable")
    else:
        router_func = router_node
        router_name = "ROUTER"
        if not callable(router_func):
            raise ValueError("router_node must be callable")

    if not routes:
        raise ValueError("routes must be a non-empty dict of name -> callable/ToolNode/tuple")

    # Add route nodes
    route_names = []
    for key, func in routes.items():
        if isinstance(func, tuple):
            route_func, route_name = func
            if not (callable(route_func) or isinstance(route_func, ToolNode)):
                raise ValueError(f"Route '{key}'[0] must be callable or ToolNode")
        else:
            route_func = func
            route_name = key
            if not (callable(route_func) or isinstance(route_func, ToolNode)):
                raise ValueError(f"Route '{key}' must be callable or ToolNode")
        self._graph.add_node(route_name, route_func)
        route_names.append(route_name)

    # Add router node as entry
    self._graph.add_node(router_name, router_func)

    # Build default condition/path_map if needed
    if condition is None and len(route_names) == 1:
        only = route_names[0]

        def _always(_: AgentState) -> str:
            return only

        condition = _always
        path_map = {only: only, END: END}

    if condition is None and len(route_names) > 1:
        raise ValueError("condition must be provided when multiple routes are defined")

    # If path_map is not provided, assume router returns exact route names
    if path_map is None:
        path_map = {k: k for k in route_names}
        path_map[END] = END

    # Conditional edges from router node based on condition results
    self._graph.add_conditional_edges(
        router_name,
        condition,  # type: ignore[arg-type]
        path_map,
    )

    # Loop back to router node from each route node
    for name in route_names:
        self._graph.add_edge(name, router_name)

    # Entry
    self._graph.set_entry_point(router_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

sequential

Classes:

Name Description
SequentialAgent

A simple sequential agent that executes a fixed pipeline of nodes.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

SequentialAgent

A simple sequential agent that executes a fixed pipeline of nodes.

Pattern: - Nodes run in the provided order: step1 -> step2 -> ... -> stepN - After the last step, the graph ends

Usage

seq = SequentialAgent() app = seq.compile([ ("ingest", ingest_node), ("plan", plan_node), ("execute", execute_node), ])

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/sequential.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class SequentialAgent[StateT: AgentState]:
    """A simple sequential agent that executes a fixed pipeline of nodes.

    Pattern:
    - Nodes run in the provided order: step1 -> step2 -> ... -> stepN
    - After the last step, the graph ends

    Usage:
        seq = SequentialAgent()
        app = seq.compile([
            ("ingest", ingest_node),
            ("plan", plan_node),
            ("execute", execute_node),
        ])
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        steps: Sequence[tuple[str, Callable | ToolNode] | tuple[Callable | ToolNode, str]],
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        if not steps or len(steps) == 0:
            raise ValueError(
                "steps must be a non-empty sequence of (name, callable/ToolNode) o"
                "or (callable/ToolNode, name)"
            )

        # Add nodes
        step_names = []
        for step in steps:
            if isinstance(step[0], str):
                name, func = step
            else:
                func, name = step
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Step '{name}' must be a callable or ToolNode")
            self._graph.add_node(name, func)  # type: ignore[arg-type]
            step_names.append(name)

        # Static edges in order
        for i in range(len(step_names) - 1):
            self._graph.add_edge(step_names[i], step_names[i + 1])

        # Entry is the first step
        self._graph.set_entry_point(step_names[0])

        # No explicit edge to END needed; the engine will end if no outgoing edges remain.
        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/sequential.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(steps, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/sequential.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def compile(
    self,
    steps: Sequence[tuple[str, Callable | ToolNode] | tuple[Callable | ToolNode, str]],
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    if not steps or len(steps) == 0:
        raise ValueError(
            "steps must be a non-empty sequence of (name, callable/ToolNode) o"
            "or (callable/ToolNode, name)"
        )

    # Add nodes
    step_names = []
    for step in steps:
        if isinstance(step[0], str):
            name, func = step
        else:
            func, name = step
        if not (callable(func) or isinstance(func, ToolNode)):
            raise ValueError(f"Step '{name}' must be a callable or ToolNode")
        self._graph.add_node(name, func)  # type: ignore[arg-type]
        step_names.append(name)

    # Static edges in order
    for i in range(len(step_names) - 1):
        self._graph.add_edge(step_names[i], step_names[i + 1])

    # Entry is the first step
    self._graph.set_entry_point(step_names[0])

    # No explicit edge to END needed; the engine will end if no outgoing edges remain.
    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

supervisor_team

Classes:

Name Description
SupervisorTeamAgent

Supervisor routes tasks to worker nodes and aggregates results.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

SupervisorTeamAgent

Supervisor routes tasks to worker nodes and aggregates results.

Nodes: - SUPERVISOR: decides which worker to call (by returning a worker key) or END - Multiple WORKER nodes: functions or ToolNode instances - AGGREGATE: optional aggregator node after worker runs; loops back to SUPERVISOR

The compile requires

supervisor_node: Callable workers: dict[str, Callable|ToolNode] aggregate_node: Callable | None condition: Callable[[AgentState], str] returns worker key or END

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/supervisor_team.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class SupervisorTeamAgent[StateT: AgentState]:
    """Supervisor routes tasks to worker nodes and aggregates results.

    Nodes:
    - SUPERVISOR: decides which worker to call (by returning a worker key) or END
    - Multiple WORKER nodes: functions or ToolNode instances
    - AGGREGATE: optional aggregator node after worker runs; loops back to SUPERVISOR

    The compile requires:
      supervisor_node: Callable
      workers: dict[str, Callable|ToolNode]
      aggregate_node: Callable | None
      condition: Callable[[AgentState], str] returns worker key or END
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(  # noqa: PLR0912
        self,
        supervisor_node: Callable | tuple[Callable, str],
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        condition: Callable[[AgentState], str],
        aggregate_node: Callable | tuple[Callable, str] | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        # Handle supervisor_node
        if isinstance(supervisor_node, tuple):
            supervisor_func, supervisor_name = supervisor_node
            if not callable(supervisor_func):
                raise ValueError("supervisor_node[0] must be callable")
        else:
            supervisor_func = supervisor_node
            supervisor_name = "SUPERVISOR"
            if not callable(supervisor_func):
                raise ValueError("supervisor_node must be callable")

        if not workers:
            raise ValueError("workers must be a non-empty dict")

        # Add worker nodes
        worker_names = []
        for key, fn in workers.items():
            if isinstance(fn, tuple):
                worker_func, worker_name = fn
                if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                    raise ValueError(f"Worker '{key}'[0] must be callable or ToolNode")
            else:
                worker_func = fn
                worker_name = key
                if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                    raise ValueError(f"Worker '{key}' must be callable or ToolNode")
            self._graph.add_node(worker_name, worker_func)
            worker_names.append(worker_name)

        # Handle aggregate_node
        aggregate_name = "AGGREGATE"
        if aggregate_node:
            if isinstance(aggregate_node, tuple):
                aggregate_func, aggregate_name = aggregate_node
                if not callable(aggregate_func):
                    raise ValueError("aggregate_node[0] must be callable")
            else:
                aggregate_func = aggregate_node
                aggregate_name = "AGGREGATE"
                if not callable(aggregate_func):
                    raise ValueError("aggregate_node must be callable")
            self._graph.add_node(aggregate_name, aggregate_func)

        # SUPERVISOR decides next worker
        path_map = {k: k for k in worker_names}
        path_map[END] = END
        self._graph.add_conditional_edges(supervisor_name, condition, path_map)

        # After worker, go to AGGREGATE if present, else back to SUPERVISOR
        for name in worker_names:
            self._graph.add_edge(name, aggregate_name if aggregate_node else supervisor_name)

        if aggregate_node:
            self._graph.add_edge(aggregate_name, supervisor_name)

        self._graph.set_entry_point(supervisor_name)

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/supervisor_team.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(supervisor_node, workers, condition, aggregate_node=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/supervisor_team.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def compile(  # noqa: PLR0912
    self,
    supervisor_node: Callable | tuple[Callable, str],
    workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    condition: Callable[[AgentState], str],
    aggregate_node: Callable | tuple[Callable, str] | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    # Handle supervisor_node
    if isinstance(supervisor_node, tuple):
        supervisor_func, supervisor_name = supervisor_node
        if not callable(supervisor_func):
            raise ValueError("supervisor_node[0] must be callable")
    else:
        supervisor_func = supervisor_node
        supervisor_name = "SUPERVISOR"
        if not callable(supervisor_func):
            raise ValueError("supervisor_node must be callable")

    if not workers:
        raise ValueError("workers must be a non-empty dict")

    # Add worker nodes
    worker_names = []
    for key, fn in workers.items():
        if isinstance(fn, tuple):
            worker_func, worker_name = fn
            if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                raise ValueError(f"Worker '{key}'[0] must be callable or ToolNode")
        else:
            worker_func = fn
            worker_name = key
            if not (callable(worker_func) or isinstance(worker_func, ToolNode)):
                raise ValueError(f"Worker '{key}' must be callable or ToolNode")
        self._graph.add_node(worker_name, worker_func)
        worker_names.append(worker_name)

    # Handle aggregate_node
    aggregate_name = "AGGREGATE"
    if aggregate_node:
        if isinstance(aggregate_node, tuple):
            aggregate_func, aggregate_name = aggregate_node
            if not callable(aggregate_func):
                raise ValueError("aggregate_node[0] must be callable")
        else:
            aggregate_func = aggregate_node
            aggregate_name = "AGGREGATE"
            if not callable(aggregate_func):
                raise ValueError("aggregate_node must be callable")
        self._graph.add_node(aggregate_name, aggregate_func)

    # SUPERVISOR decides next worker
    path_map = {k: k for k in worker_names}
    path_map[END] = END
    self._graph.add_conditional_edges(supervisor_name, condition, path_map)

    # After worker, go to AGGREGATE if present, else back to SUPERVISOR
    for name in worker_names:
        self._graph.add_edge(name, aggregate_name if aggregate_node else supervisor_name)

    if aggregate_node:
        self._graph.add_edge(aggregate_name, supervisor_name)

    self._graph.set_entry_point(supervisor_name)

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )

swarm

Classes:

Name Description
SwarmAgent

Swarm pattern: dispatch to many workers, collect, then reach consensus.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound=AgentState)

Classes

SwarmAgent

Swarm pattern: dispatch to many workers, collect, then reach consensus.

Notes: - The underlying engine executes nodes sequentially; true parallelism isn't performed at the graph level. For concurrency, worker/collector nodes can internally use BackgroundTaskManager or async to fan-out. - This pattern wires a linear broadcast-collect chain ending in CONSENSUS.

Nodes: - optional DISPATCH: prepare/plan the swarm task - WORKER_i: a set of worker nodes (Callable or ToolNode) - optional COLLECT: consolidate each worker's result into shared state - CONSENSUS: aggregate all collected results and produce final output

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/swarm.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class SwarmAgent[StateT: AgentState]:
    """Swarm pattern: dispatch to many workers, collect, then reach consensus.

    Notes:
    - The underlying engine executes nodes sequentially; true parallelism isn't
      performed at the graph level. For concurrency, worker/collector nodes can
      internally use BackgroundTaskManager or async to fan-out.
    - This pattern wires a linear broadcast-collect chain ending in CONSENSUS.

    Nodes:
    - optional DISPATCH: prepare/plan the swarm task
    - WORKER_i: a set of worker nodes (Callable or ToolNode)
    - optional COLLECT: consolidate each worker's result into shared state
    - CONSENSUS: aggregate all collected results and produce final output
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        consensus_node: Callable | tuple[Callable, str],
        options: dict | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        resolved_workers = self._add_worker_nodes(workers)
        worker_sequence = resolved_workers

        options = options or {}
        dispatch_node = options.get("dispatch")
        collect_node = options.get("collect")
        followup_condition = options.get("followup_condition")

        dispatch_name = self._resolve_dispatch(dispatch_node)
        collect_info = self._resolve_collect(collect_node)
        consensus_name = self._resolve_consensus(consensus_node)

        entry = dispatch_name or worker_sequence[0]
        self._graph.set_entry_point(entry)
        if dispatch_name:
            self._graph.add_edge(dispatch_name, worker_sequence[0])

        self._wire_edges(worker_sequence, collect_info, consensus_name)

        if followup_condition is None:

            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        self._graph.add_conditional_edges(
            consensus_name,
            followup_condition,
            {entry: entry, END: END},
        )

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    # ---- helpers ----
    def _add_worker_nodes(
        self,
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    ) -> list[str]:
        if not workers:
            raise ValueError("workers must be a non-empty dict")

        names: list[str] = []
        for key, fn in workers.items():
            if isinstance(fn, tuple):
                func, name = fn
            else:
                func, name = fn, key
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Worker '{key}' must be a callable or ToolNode")
            self._graph.add_node(name, func)
            names.append(name)
        return names

    def _resolve_dispatch(self, node: Callable | tuple[Callable, str] | None) -> str | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "DISPATCH"
        if not callable(func):
            raise ValueError("dispatch node must be callable")
        self._graph.add_node(name, func)
        return name

    def _resolve_collect(
        self,
        node: Callable | tuple[Callable, str] | None,
    ) -> tuple[Callable, str] | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "COLLECT"
        if not callable(func):
            raise ValueError("collect node must be callable")
        # Do not add a single shared collect node to avoid ambiguous routing.
        # We'll create per-worker collect nodes during wiring using this (func, base_name).
        return func, name

    def _resolve_consensus(self, node: Callable | tuple[Callable, str]) -> str:
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "CONSENSUS"
        if not callable(func):
            raise ValueError("consensus node must be callable")
        self._graph.add_node(name, func)
        return name

    def _wire_edges(
        self,
        worker_sequence: list[str],
        collect_info: tuple[Callable, str] | None,
        consensus_name: str,
    ) -> None:
        for i, wname in enumerate(worker_sequence):
            is_last = i == len(worker_sequence) - 1
            target = consensus_name if is_last else worker_sequence[i + 1]
            if collect_info:
                cfunc, base = collect_info
                cname = f"{base}_{i + 1}"
                # Create a dedicated collect node per worker to prevent loops
                self._graph.add_node(cname, cfunc)
                self._graph.add_edge(wname, cname)
                self._graph.add_edge(cname, target)
            else:
                self._graph.add_edge(wname, target)
Functions
__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/swarm.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(workers, consensus_node, options=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/swarm.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def compile(
    self,
    workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    consensus_node: Callable | tuple[Callable, str],
    options: dict | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    resolved_workers = self._add_worker_nodes(workers)
    worker_sequence = resolved_workers

    options = options or {}
    dispatch_node = options.get("dispatch")
    collect_node = options.get("collect")
    followup_condition = options.get("followup_condition")

    dispatch_name = self._resolve_dispatch(dispatch_node)
    collect_info = self._resolve_collect(collect_node)
    consensus_name = self._resolve_consensus(consensus_node)

    entry = dispatch_name or worker_sequence[0]
    self._graph.set_entry_point(entry)
    if dispatch_name:
        self._graph.add_edge(dispatch_name, worker_sequence[0])

    self._wire_edges(worker_sequence, collect_info, consensus_name)

    if followup_condition is None:

        def _cond(_: AgentState) -> str:
            return END

        followup_condition = _cond

    self._graph.add_conditional_edges(
        consensus_name,
        followup_condition,
        {entry: entry, END: END},
    )

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )