Skip to content

Swarm

Classes:

Name Description
SwarmAgent

Swarm pattern: dispatch to many workers, collect, then reach consensus.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute

StateT = TypeVar('StateT', bound=AgentState)

Classes

SwarmAgent

Swarm pattern: dispatch to many workers, collect, then reach consensus.

Notes: - The underlying engine executes nodes sequentially; true parallelism isn't performed at the graph level. For concurrency, worker/collector nodes can internally use BackgroundTaskManager or async to fan-out. - This pattern wires a linear broadcast-collect chain ending in CONSENSUS.

Nodes: - optional DISPATCH: prepare/plan the swarm task - WORKER_i: a set of worker nodes (Callable or ToolNode) - optional COLLECT: consolidate each worker's result into shared state - CONSENSUS: aggregate all collected results and produce final output

Methods:

Name Description
__init__
compile
Source code in pyagenity/prebuilt/agent/swarm.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class SwarmAgent[StateT: AgentState]:
    """Swarm pattern: dispatch to many workers, collect, then reach consensus.

    Notes:
    - The underlying engine executes nodes sequentially; true parallelism isn't
      performed at the graph level. For concurrency, worker/collector nodes can
      internally use BackgroundTaskManager or async to fan-out.
    - This pattern wires a linear broadcast-collect chain ending in CONSENSUS.

    Nodes:
    - optional DISPATCH: prepare/plan the swarm task
    - WORKER_i: a set of worker nodes (Callable or ToolNode)
    - optional COLLECT: consolidate each worker's result into shared state
    - CONSENSUS: aggregate all collected results and produce final output
    """

    def __init__(
        self,
        state: StateT | None = None,
        context_manager: BaseContextManager[StateT] | None = None,
        publisher: BasePublisher | None = None,
        id_generator: BaseIDGenerator = DefaultIDGenerator(),
        container: InjectQ | None = None,
    ):
        self._graph = StateGraph[StateT](
            state=state,
            context_manager=context_manager,
            publisher=publisher,
            id_generator=id_generator,
            container=container,
        )

    def compile(
        self,
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
        consensus_node: Callable | tuple[Callable, str],
        options: dict | None = None,
        checkpointer: BaseCheckpointer[StateT] | None = None,
        store: BaseStore | None = None,
        interrupt_before: list[str] | None = None,
        interrupt_after: list[str] | None = None,
        callback_manager: CallbackManager = CallbackManager(),
    ) -> CompiledGraph:
        resolved_workers = self._add_worker_nodes(workers)
        worker_sequence = resolved_workers

        options = options or {}
        dispatch_node = options.get("dispatch")
        collect_node = options.get("collect")
        followup_condition = options.get("followup_condition")

        dispatch_name = self._resolve_dispatch(dispatch_node)
        collect_info = self._resolve_collect(collect_node)
        consensus_name = self._resolve_consensus(consensus_node)

        entry = dispatch_name or worker_sequence[0]
        self._graph.set_entry_point(entry)
        if dispatch_name:
            self._graph.add_edge(dispatch_name, worker_sequence[0])

        self._wire_edges(worker_sequence, collect_info, consensus_name)

        if followup_condition is None:

            def _cond(_: AgentState) -> str:
                return END

            followup_condition = _cond

        self._graph.add_conditional_edges(
            consensus_name,
            followup_condition,
            {entry: entry, END: END},
        )

        return self._graph.compile(
            checkpointer=checkpointer,
            store=store,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            callback_manager=callback_manager,
        )

    # ---- helpers ----
    def _add_worker_nodes(
        self,
        workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    ) -> list[str]:
        if not workers:
            raise ValueError("workers must be a non-empty dict")

        names: list[str] = []
        for key, fn in workers.items():
            if isinstance(fn, tuple):
                func, name = fn
            else:
                func, name = fn, key
            if not (callable(func) or isinstance(func, ToolNode)):
                raise ValueError(f"Worker '{key}' must be a callable or ToolNode")
            self._graph.add_node(name, func)
            names.append(name)
        return names

    def _resolve_dispatch(self, node: Callable | tuple[Callable, str] | None) -> str | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "DISPATCH"
        if not callable(func):
            raise ValueError("dispatch node must be callable")
        self._graph.add_node(name, func)
        return name

    def _resolve_collect(
        self,
        node: Callable | tuple[Callable, str] | None,
    ) -> tuple[Callable, str] | None:
        if not node:
            return None
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "COLLECT"
        if not callable(func):
            raise ValueError("collect node must be callable")
        # Do not add a single shared collect node to avoid ambiguous routing.
        # We'll create per-worker collect nodes during wiring using this (func, base_name).
        return func, name

    def _resolve_consensus(self, node: Callable | tuple[Callable, str]) -> str:
        if isinstance(node, tuple):
            func, name = node
        else:
            func, name = node, "CONSENSUS"
        if not callable(func):
            raise ValueError("consensus node must be callable")
        self._graph.add_node(name, func)
        return name

    def _wire_edges(
        self,
        worker_sequence: list[str],
        collect_info: tuple[Callable, str] | None,
        consensus_name: str,
    ) -> None:
        for i, wname in enumerate(worker_sequence):
            is_last = i == len(worker_sequence) - 1
            target = consensus_name if is_last else worker_sequence[i + 1]
            if collect_info:
                cfunc, base = collect_info
                cname = f"{base}_{i + 1}"
                # Create a dedicated collect node per worker to prevent loops
                self._graph.add_node(cname, cfunc)
                self._graph.add_edge(wname, cname)
                self._graph.add_edge(cname, target)
            else:
                self._graph.add_edge(wname, target)

Functions

__init__
__init__(state=None, context_manager=None, publisher=None, id_generator=DefaultIDGenerator(), container=None)
Source code in pyagenity/prebuilt/agent/swarm.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    state: StateT | None = None,
    context_manager: BaseContextManager[StateT] | None = None,
    publisher: BasePublisher | None = None,
    id_generator: BaseIDGenerator = DefaultIDGenerator(),
    container: InjectQ | None = None,
):
    self._graph = StateGraph[StateT](
        state=state,
        context_manager=context_manager,
        publisher=publisher,
        id_generator=id_generator,
        container=container,
    )
compile
compile(workers, consensus_node, options=None, checkpointer=None, store=None, interrupt_before=None, interrupt_after=None, callback_manager=CallbackManager())
Source code in pyagenity/prebuilt/agent/swarm.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def compile(
    self,
    workers: dict[str, Callable | ToolNode | tuple[Callable | ToolNode, str]],
    consensus_node: Callable | tuple[Callable, str],
    options: dict | None = None,
    checkpointer: BaseCheckpointer[StateT] | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    callback_manager: CallbackManager = CallbackManager(),
) -> CompiledGraph:
    resolved_workers = self._add_worker_nodes(workers)
    worker_sequence = resolved_workers

    options = options or {}
    dispatch_node = options.get("dispatch")
    collect_node = options.get("collect")
    followup_condition = options.get("followup_condition")

    dispatch_name = self._resolve_dispatch(dispatch_node)
    collect_info = self._resolve_collect(collect_node)
    consensus_name = self._resolve_consensus(consensus_node)

    entry = dispatch_name or worker_sequence[0]
    self._graph.set_entry_point(entry)
    if dispatch_name:
        self._graph.add_edge(dispatch_name, worker_sequence[0])

    self._wire_edges(worker_sequence, collect_info, consensus_name)

    if followup_condition is None:

        def _cond(_: AgentState) -> str:
            return END

        followup_condition = _cond

    self._graph.add_conditional_edges(
        consensus_name,
        followup_condition,
        {entry: entry, END: END},
    )

    return self._graph.compile(
        checkpointer=checkpointer,
        store=store,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        callback_manager=callback_manager,
    )