Skip to content

State

State management for PyAgenity agent graphs.

This package provides schemas and context managers for agent state, execution tracking, and message context management. All core state classes are exported for use in agent workflows and custom state extensions.

Modules:

Name Description
agent_state

Agent state schema for PyAgenity agent graphs.

base_context

Abstract base class for context management in PyAgenity agent graphs.

execution_state

Execution state management for graph execution in PyAgenity.

message_context_manager

Message context management for agent state in PyAgenity.

Classes:

Name Description
AgentState

Common state schema that includes messages, context and internal execution metadata.

BaseContextManager

Abstract base class for context management in AI interactions.

ExecutionState

Tracks the internal execution state of a graph.

ExecutionStatus

Status of graph execution.

MessageContextManager

Manages the context field for AI interactions.

Attributes

__all__ module-attribute

__all__ = ['AgentState', 'BaseContextManager', 'ExecutionState', 'ExecutionStatus', 'MessageContextManager']

Classes

AgentState

Bases: BaseModel

Common state schema that includes messages, context and internal execution metadata.

This class can be subclassed to add application-specific fields while maintaining compatibility with the PyAgenity framework. All internal execution metadata is preserved through subclassing.

Notes: - execution_meta contains internal-only execution progress and interrupt info. - Users may subclass AgentState to add application fields; internal exec meta remains available to the runtime and will be persisted with the state. - When subclassing, add your fields but keep the core fields intact.

Example

class MyCustomState(AgentState): user_data: dict = Field(default_factory=dict) custom_field: str = "default"

Methods:

Name Description
advance_step

Advance the execution step in the metadata.

clear_interrupt

Clear any interrupt in the execution metadata.

complete

Mark the agent state as completed.

error

Mark the agent state as errored.

is_interrupted

Check if the agent state is currently interrupted.

is_running

Check if the agent state is currently running.

is_stopped_requested

Check if a stop has been requested for the agent state.

set_current_node

Set the current node in the execution metadata.

set_interrupt

Set an interrupt in the execution metadata.

Attributes:

Name Type Description
context Annotated[list[Message], add_messages]
context_summary str | None
execution_meta ExecutionState
Source code in pyagenity/state/agent_state.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class AgentState(BaseModel):
    """Common state schema that includes messages, context and internal execution metadata.

    This class can be subclassed to add application-specific fields while maintaining
    compatibility with the PyAgenity framework. All internal execution metadata
    is preserved through subclassing.

    Notes:
    - `execution_meta` contains internal-only execution progress and interrupt info.
    - Users may subclass `AgentState` to add application fields; internal exec meta remains
      available to the runtime and will be persisted with the state.
    - When subclassing, add your fields but keep the core fields intact.

    Example:
        class MyCustomState(AgentState):
            user_data: dict = Field(default_factory=dict)
            custom_field: str = "default"
    """

    context: Annotated[list[Message], add_messages] = Field(default_factory=list)
    context_summary: str | None = None
    # Internal execution metadata (kept private-ish but accessible to runtime)
    execution_meta: ExecMeta = Field(default_factory=lambda: ExecMeta(current_node=START))

    # Convenience delegation methods for execution meta so callers can use the same API
    def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
        """
        Set an interrupt in the execution metadata.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status: Execution status to set.
            data (dict | None): Optional additional interrupt data.
        """
        logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
        self.execution_meta.set_interrupt(node, reason, status, data)

    def clear_interrupt(self) -> None:
        """
        Clear any interrupt in the execution metadata.
        """
        logger.debug("Clearing interrupt")
        self.execution_meta.clear_interrupt()

    def is_running(self) -> bool:
        """
        Check if the agent state is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.execution_meta.is_running()
        logger.debug("State is_running: %s", running)
        return running

    def is_interrupted(self) -> bool:
        """
        Check if the agent state is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.execution_meta.is_interrupted()
        logger.debug("State is_interrupted: %s", interrupted)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance the execution step in the metadata.
        """
        old_step = self.execution_meta.step
        self.execution_meta.advance_step()
        logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)

    def set_current_node(self, node: str) -> None:
        """
        Set the current node in the execution metadata.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.execution_meta.current_node
        self.execution_meta.set_current_node(node)
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark the agent state as completed.
        """
        logger.info("Marking state as completed")
        self.execution_meta.complete()

    def error(self, error_msg: str) -> None:
        """
        Mark the agent state as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Setting state error: %s", error_msg)
        self.execution_meta.error(error_msg)

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for the agent state.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.execution_meta.is_stopped_requested()
        logger.debug("State is_stopped_requested: %s", stopped)
        return stopped

Attributes

context class-attribute instance-attribute
context = Field(default_factory=list)
context_summary class-attribute instance-attribute
context_summary = None
execution_meta class-attribute instance-attribute
execution_meta = Field(default_factory=lambda: ExecutionState(current_node=START))

Functions

advance_step
advance_step()

Advance the execution step in the metadata.

Source code in pyagenity/state/agent_state.py
90
91
92
93
94
95
96
def advance_step(self) -> None:
    """
    Advance the execution step in the metadata.
    """
    old_step = self.execution_meta.step
    self.execution_meta.advance_step()
    logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)
clear_interrupt
clear_interrupt()

Clear any interrupt in the execution metadata.

Source code in pyagenity/state/agent_state.py
61
62
63
64
65
66
def clear_interrupt(self) -> None:
    """
    Clear any interrupt in the execution metadata.
    """
    logger.debug("Clearing interrupt")
    self.execution_meta.clear_interrupt()
complete
complete()

Mark the agent state as completed.

Source code in pyagenity/state/agent_state.py
109
110
111
112
113
114
def complete(self) -> None:
    """
    Mark the agent state as completed.
    """
    logger.info("Marking state as completed")
    self.execution_meta.complete()
error
error(error_msg)

Mark the agent state as errored.

Parameters:

Name Type Description Default
error_msg
str

Error message to record.

required
Source code in pyagenity/state/agent_state.py
116
117
118
119
120
121
122
123
124
def error(self, error_msg: str) -> None:
    """
    Mark the agent state as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Setting state error: %s", error_msg)
    self.execution_meta.error(error_msg)
is_interrupted
is_interrupted()

Check if the agent state is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in pyagenity/state/agent_state.py
79
80
81
82
83
84
85
86
87
88
def is_interrupted(self) -> bool:
    """
    Check if the agent state is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.execution_meta.is_interrupted()
    logger.debug("State is_interrupted: %s", interrupted)
    return interrupted
is_running
is_running()

Check if the agent state is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in pyagenity/state/agent_state.py
68
69
70
71
72
73
74
75
76
77
def is_running(self) -> bool:
    """
    Check if the agent state is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.execution_meta.is_running()
    logger.debug("State is_running: %s", running)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for the agent state.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in pyagenity/state/agent_state.py
126
127
128
129
130
131
132
133
134
135
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for the agent state.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.execution_meta.is_stopped_requested()
    logger.debug("State is_stopped_requested: %s", stopped)
    return stopped
set_current_node
set_current_node(node)

Set the current node in the execution metadata.

Parameters:

Name Type Description Default
node
str

Node to set as current.

required
Source code in pyagenity/state/agent_state.py
 98
 99
100
101
102
103
104
105
106
107
def set_current_node(self, node: str) -> None:
    """
    Set the current node in the execution metadata.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.execution_meta.current_node
    self.execution_meta.set_current_node(node)
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set an interrupt in the execution metadata.

Parameters:

Name Type Description Default
node
str

Node where the interrupt occurred.

required
reason
str

Reason for the interrupt.

required
status

Execution status to set.

required
data
dict | None

Optional additional interrupt data.

None
Source code in pyagenity/state/agent_state.py
48
49
50
51
52
53
54
55
56
57
58
59
def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
    """
    Set an interrupt in the execution metadata.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status: Execution status to set.
        data (dict | None): Optional additional interrupt data.
    """
    logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
    self.execution_meta.set_interrupt(node, reason, status, data)

BaseContextManager

Bases: ABC

Abstract base class for context management in AI interactions.

Subclasses should implement trim_context as either a synchronous or asynchronous method. Generic over AgentState or its subclasses.

Methods:

Name Description
atrim_context

Trim context based on message count asynchronously.

trim_context

Trim context based on message count. Can be sync or async.

Source code in pyagenity/state/base_context.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BaseContextManager[S](ABC):
    """
    Abstract base class for context management in AI interactions.

    Subclasses should implement `trim_context` as either a synchronous or asynchronous method.
    Generic over AgentState or its subclasses.
    """

    @abstractmethod
    def trim_context(self, state: S) -> S:
        """
        Trim context based on message count. Can be sync or async.

        Subclasses may implement as either a synchronous or asynchronous method.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context, either directly or as an awaitable.
        """
        raise NotImplementedError("Subclasses must implement this method (sync or async)")

    @abstractmethod
    async def atrim_context(self, state: S) -> S:
        """
        Trim context based on message count asynchronously.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context.
        """
        raise NotImplementedError("Subclasses must implement this method")

Functions

atrim_context abstractmethod async
atrim_context(state)

Trim context based on message count asynchronously.

Parameters:

Name Type Description Default
state
S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context.

Source code in pyagenity/state/base_context.py
43
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
async def atrim_context(self, state: S) -> S:
    """
    Trim context based on message count asynchronously.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context.
    """
    raise NotImplementedError("Subclasses must implement this method")
trim_context abstractmethod
trim_context(state)

Trim context based on message count. Can be sync or async.

Subclasses may implement as either a synchronous or asynchronous method.

Parameters:

Name Type Description Default
state
S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context, either directly or as an awaitable.

Source code in pyagenity/state/base_context.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@abstractmethod
def trim_context(self, state: S) -> S:
    """
    Trim context based on message count. Can be sync or async.

    Subclasses may implement as either a synchronous or asynchronous method.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context, either directly or as an awaitable.
    """
    raise NotImplementedError("Subclasses must implement this method (sync or async)")

ExecutionState

Bases: BaseModel

Tracks the internal execution state of a graph.

This class manages the execution progress, interrupt status, and internal data that should not be exposed to users.

Methods:

Name Description
advance_step

Advance to the next execution step.

clear_interrupt

Clear the interrupt state and resume execution.

complete

Mark execution as completed.

error

Mark execution as errored.

from_dict

Create an ExecutionState instance from a dictionary.

is_interrupted

Check if execution is currently interrupted.

is_running

Check if execution is currently running.

is_stopped_requested

Check if a stop has been requested for execution.

set_current_node

Update the current node in execution state.

set_interrupt

Set the interrupt state for execution.

Attributes:

Name Type Description
current_node str
internal_data dict[str, Any]
interrupt_data dict[str, Any] | None
interrupt_reason str | None
interrupted_node str | None
status ExecutionStatus
step int
stop_current_execution StopRequestStatus
thread_id str | None
Source code in pyagenity/state/execution_state.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class ExecutionState(BaseModel):
    """
    Tracks the internal execution state of a graph.

    This class manages the execution progress, interrupt status, and internal
    data that should not be exposed to users.
    """

    # Core execution tracking
    current_node: str
    step: int = 0
    status: ExecutionStatus = ExecutionStatus.RUNNING

    # Interrupt management
    interrupted_node: str | None = None
    interrupt_reason: str | None = None
    interrupt_data: dict[str, Any] | None = None

    # Thread/session identification
    thread_id: str | None = None

    # Stop Current Execution Flag
    stop_current_execution: StopRequestStatus = StopRequestStatus.NONE

    # Internal execution data (hidden from user)
    internal_data: dict[str, Any] = Field(default_factory=dict)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
        """
        Create an ExecutionState instance from a dictionary.

        Args:
            data (dict[str, Any]): Dictionary containing execution state fields.

        Returns:
            ExecutionState: The deserialized execution state object.
        """
        return cls.model_validate(
            {
                "current_node": data["current_node"],
                "step": data.get("step", 0),
                "status": ExecutionStatus(data.get("status", "running")),
                "interrupted_node": data.get("interrupted_node"),
                "interrupt_reason": data.get("interrupt_reason"),
                "interrupt_data": data.get("interrupt_data"),
                "thread_id": data.get("thread_id"),
                "internal_data": data.get("_internal_data", {}),
            }
        )

    def set_interrupt(
        self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
    ) -> None:
        """
        Set the interrupt state for execution.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status (ExecutionStatus): Status to set for the interrupt.
            data (dict[str, Any] | None): Optional additional interrupt data.
        """
        logger.debug(
            "Setting interrupt: node='%s', reason='%s', status='%s'",
            node,
            reason,
            status.value,
        )
        self.interrupted_node = node
        self.interrupt_reason = reason
        self.status = status
        self.interrupt_data = data

    def clear_interrupt(self) -> None:
        """
        Clear the interrupt state and resume execution.
        """
        logger.debug("Clearing interrupt, resuming execution")
        self.interrupted_node = None
        self.interrupt_reason = None
        self.interrupt_data = None
        self.status = ExecutionStatus.RUNNING

    def is_interrupted(self) -> bool:
        """
        Check if execution is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.status in [
            ExecutionStatus.INTERRUPTED_BEFORE,
            ExecutionStatus.INTERRUPTED_AFTER,
        ]
        logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance to the next execution step.
        """
        old_step = self.step
        self.step += 1
        logger.debug("Advanced step from %d to %d", old_step, self.step)

    def set_current_node(self, node: str) -> None:
        """
        Update the current node in execution state.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.current_node
        self.current_node = node
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark execution as completed.
        """
        logger.info("Marking execution as completed")
        self.status = ExecutionStatus.COMPLETED

    def error(self, error_msg: str) -> None:
        """
        Mark execution as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Marking execution as errored: %s", error_msg)
        self.status = ExecutionStatus.ERROR
        self.internal_data["error"] = error_msg

    def is_running(self) -> bool:
        """
        Check if execution is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.status == ExecutionStatus.RUNNING
        logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
        return running

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for execution.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
        logger.debug(
            "Execution is_stopped_requested: %s (stop_current_execution: %s)",
            stopped,
            self.stop_current_execution.value,
        )
        return stopped

Attributes

current_node instance-attribute
current_node
internal_data class-attribute instance-attribute
internal_data = Field(default_factory=dict)
interrupt_data class-attribute instance-attribute
interrupt_data = None
interrupt_reason class-attribute instance-attribute
interrupt_reason = None
interrupted_node class-attribute instance-attribute
interrupted_node = None
status class-attribute instance-attribute
status = RUNNING
step class-attribute instance-attribute
step = 0
stop_current_execution class-attribute instance-attribute
stop_current_execution = NONE
thread_id class-attribute instance-attribute
thread_id = None

Functions

advance_step
advance_step()

Advance to the next execution step.

Source code in pyagenity/state/execution_state.py
134
135
136
137
138
139
140
def advance_step(self) -> None:
    """
    Advance to the next execution step.
    """
    old_step = self.step
    self.step += 1
    logger.debug("Advanced step from %d to %d", old_step, self.step)
clear_interrupt
clear_interrupt()

Clear the interrupt state and resume execution.

Source code in pyagenity/state/execution_state.py
110
111
112
113
114
115
116
117
118
def clear_interrupt(self) -> None:
    """
    Clear the interrupt state and resume execution.
    """
    logger.debug("Clearing interrupt, resuming execution")
    self.interrupted_node = None
    self.interrupt_reason = None
    self.interrupt_data = None
    self.status = ExecutionStatus.RUNNING
complete
complete()

Mark execution as completed.

Source code in pyagenity/state/execution_state.py
153
154
155
156
157
158
def complete(self) -> None:
    """
    Mark execution as completed.
    """
    logger.info("Marking execution as completed")
    self.status = ExecutionStatus.COMPLETED
error
error(error_msg)

Mark execution as errored.

Parameters:

Name Type Description Default
error_msg
str

Error message to record.

required
Source code in pyagenity/state/execution_state.py
160
161
162
163
164
165
166
167
168
169
def error(self, error_msg: str) -> None:
    """
    Mark execution as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Marking execution as errored: %s", error_msg)
    self.status = ExecutionStatus.ERROR
    self.internal_data["error"] = error_msg
from_dict classmethod
from_dict(data)

Create an ExecutionState instance from a dictionary.

Parameters:

Name Type Description Default
data
dict[str, Any]

Dictionary containing execution state fields.

required

Returns:

Name Type Description
ExecutionState ExecutionState

The deserialized execution state object.

Source code in pyagenity/state/execution_state.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
    """
    Create an ExecutionState instance from a dictionary.

    Args:
        data (dict[str, Any]): Dictionary containing execution state fields.

    Returns:
        ExecutionState: The deserialized execution state object.
    """
    return cls.model_validate(
        {
            "current_node": data["current_node"],
            "step": data.get("step", 0),
            "status": ExecutionStatus(data.get("status", "running")),
            "interrupted_node": data.get("interrupted_node"),
            "interrupt_reason": data.get("interrupt_reason"),
            "interrupt_data": data.get("interrupt_data"),
            "thread_id": data.get("thread_id"),
            "internal_data": data.get("_internal_data", {}),
        }
    )
is_interrupted
is_interrupted()

Check if execution is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in pyagenity/state/execution_state.py
120
121
122
123
124
125
126
127
128
129
130
131
132
def is_interrupted(self) -> bool:
    """
    Check if execution is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.status in [
        ExecutionStatus.INTERRUPTED_BEFORE,
        ExecutionStatus.INTERRUPTED_AFTER,
    ]
    logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
    return interrupted
is_running
is_running()

Check if execution is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in pyagenity/state/execution_state.py
171
172
173
174
175
176
177
178
179
180
def is_running(self) -> bool:
    """
    Check if execution is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.status == ExecutionStatus.RUNNING
    logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for execution.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in pyagenity/state/execution_state.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for execution.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
    logger.debug(
        "Execution is_stopped_requested: %s (stop_current_execution: %s)",
        stopped,
        self.stop_current_execution.value,
    )
    return stopped
set_current_node
set_current_node(node)

Update the current node in execution state.

Parameters:

Name Type Description Default
node
str

Node to set as current.

required
Source code in pyagenity/state/execution_state.py
142
143
144
145
146
147
148
149
150
151
def set_current_node(self, node: str) -> None:
    """
    Update the current node in execution state.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.current_node
    self.current_node = node
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set the interrupt state for execution.

Parameters:

Name Type Description Default
node
str

Node where the interrupt occurred.

required
reason
str

Reason for the interrupt.

required
status
ExecutionStatus

Status to set for the interrupt.

required
data
dict[str, Any] | None

Optional additional interrupt data.

None
Source code in pyagenity/state/execution_state.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def set_interrupt(
    self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
) -> None:
    """
    Set the interrupt state for execution.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status (ExecutionStatus): Status to set for the interrupt.
        data (dict[str, Any] | None): Optional additional interrupt data.
    """
    logger.debug(
        "Setting interrupt: node='%s', reason='%s', status='%s'",
        node,
        reason,
        status.value,
    )
    self.interrupted_node = node
    self.interrupt_reason = reason
    self.status = status
    self.interrupt_data = data

ExecutionStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
COMPLETED
ERROR
INTERRUPTED_AFTER
INTERRUPTED_BEFORE
RUNNING
Source code in pyagenity/state/execution_state.py
18
19
20
21
22
23
24
25
class ExecutionStatus(Enum):
    """Status of graph execution."""

    RUNNING = "running"
    INTERRUPTED_BEFORE = "interrupted_before"
    INTERRUPTED_AFTER = "interrupted_after"
    COMPLETED = "completed"
    ERROR = "error"

Attributes

COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED_AFTER class-attribute instance-attribute
INTERRUPTED_AFTER = 'interrupted_after'
INTERRUPTED_BEFORE class-attribute instance-attribute
INTERRUPTED_BEFORE = 'interrupted_before'
RUNNING class-attribute instance-attribute
RUNNING = 'running'

MessageContextManager

Bases: BaseContextManager[S]

Manages the context field for AI interactions.

This class trims the context (message history) based on a maximum number of user messages, ensuring the first message (usually a system prompt) is always preserved. Generic over AgentState or its subclasses.

Methods:

Name Description
__init__

Initialize the MessageContextManager.

atrim_context

Asynchronous version of trim_context.

trim_context

Trim the context in the given AgentState based on the maximum number of user messages.

Attributes:

Name Type Description
max_messages
Source code in pyagenity/state/message_context_manager.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class MessageContextManager(BaseContextManager[S]):
    """
    Manages the context field for AI interactions.

    This class trims the context (message history) based on a maximum number of user messages,
    ensuring the first message (usually a system prompt) is always preserved.
    Generic over AgentState or its subclasses.
    """

    def __init__(self, max_messages: int = 10) -> None:
        """
        Initialize the MessageContextManager.

        Args:
            max_messages (int): Maximum number of
                user messages to keep in context. Default is 10.
        """
        self.max_messages = max_messages
        logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)

    def _trim(self, messages: list[Message]) -> list[Message] | None:
        """
        Trim messages keeping system messages and most recent user messages.

        Returns None if no trimming is needed, otherwise returns the trimmed list.
        """
        # check context is empty
        if not messages:
            logger.debug("No messages to trim; context is empty")
            return None

        # Count user messages
        user_message_count = sum(1 for msg in messages if msg.role == "user")

        if user_message_count <= self.max_messages:
            # no trimming needed
            logger.debug(
                "No trimming needed; context is within limits (%d user messages)",
                user_message_count,
            )
            return None

        # Separate system messages (usually at the beginning)
        system_messages = [msg for msg in messages if msg.role == "system"]
        non_system_messages = [msg for msg in messages if msg.role != "system"]

        # Keep only the most recent messages that include max_messages user messages
        final_non_system = []
        user_count = 0

        # Iterate from the end to keep most recent messages
        for msg in reversed(non_system_messages):
            if msg.role == "user":
                if user_count >= self.max_messages:
                    break
                user_count += 1
            final_non_system.insert(0, msg)  # Insert at beginning to maintain order

        # Combine system messages (at start) with trimmed conversation
        trimmed_messages = system_messages + final_non_system

        logger.debug(
            "Trimmed from %d to %d messages (%d user messages kept)",
            len(messages),
            len(trimmed_messages),
            user_count,
        )

        return trimmed_messages

    def trim_context(self, state: S) -> S:
        """
        Trim the context in the given AgentState based on the maximum number of user messages.

        The first message (typically a system prompt) is always preserved. Only the most recent
        user messages up to `max_messages` are kept, along with the first message.

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state

    async def atrim_context(self, state: S) -> S:
        """
        Asynchronous version of trim_context.

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state

Attributes

max_messages instance-attribute
max_messages = max_messages

Functions

__init__
__init__(max_messages=10)

Initialize the MessageContextManager.

Parameters:

Name Type Description Default
max_messages
int

Maximum number of user messages to keep in context. Default is 10.

10
Source code in pyagenity/state/message_context_manager.py
31
32
33
34
35
36
37
38
39
40
def __init__(self, max_messages: int = 10) -> None:
    """
    Initialize the MessageContextManager.

    Args:
        max_messages (int): Maximum number of
            user messages to keep in context. Default is 10.
    """
    self.max_messages = max_messages
    logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)
atrim_context async
atrim_context(state)

Asynchronous version of trim_context.

Parameters:

Name Type Description Default
state
AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in pyagenity/state/message_context_manager.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def atrim_context(self, state: S) -> S:
    """
    Asynchronous version of trim_context.

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state
trim_context
trim_context(state)

Trim the context in the given AgentState based on the maximum number of user messages.

The first message (typically a system prompt) is always preserved. Only the most recent user messages up to max_messages are kept, along with the first message.

Parameters:

Name Type Description Default
state
AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in pyagenity/state/message_context_manager.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def trim_context(self, state: S) -> S:
    """
    Trim the context in the given AgentState based on the maximum number of user messages.

    The first message (typically a system prompt) is always preserved. Only the most recent
    user messages up to `max_messages` are kept, along with the first message.

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state

Modules

agent_state

Agent state schema for PyAgenity agent graphs.

This module provides the AgentState class, which tracks message context, context summaries, and internal execution metadata for agent workflows. Supports subclassing for custom application fields.

Classes:

Name Description
AgentState

Common state schema that includes messages, context and internal execution metadata.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

AgentState

Bases: BaseModel

Common state schema that includes messages, context and internal execution metadata.

This class can be subclassed to add application-specific fields while maintaining compatibility with the PyAgenity framework. All internal execution metadata is preserved through subclassing.

Notes: - execution_meta contains internal-only execution progress and interrupt info. - Users may subclass AgentState to add application fields; internal exec meta remains available to the runtime and will be persisted with the state. - When subclassing, add your fields but keep the core fields intact.

Example

class MyCustomState(AgentState): user_data: dict = Field(default_factory=dict) custom_field: str = "default"

Methods:

Name Description
advance_step

Advance the execution step in the metadata.

clear_interrupt

Clear any interrupt in the execution metadata.

complete

Mark the agent state as completed.

error

Mark the agent state as errored.

is_interrupted

Check if the agent state is currently interrupted.

is_running

Check if the agent state is currently running.

is_stopped_requested

Check if a stop has been requested for the agent state.

set_current_node

Set the current node in the execution metadata.

set_interrupt

Set an interrupt in the execution metadata.

Attributes:

Name Type Description
context Annotated[list[Message], add_messages]
context_summary str | None
execution_meta ExecutionState
Source code in pyagenity/state/agent_state.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class AgentState(BaseModel):
    """Common state schema that includes messages, context and internal execution metadata.

    This class can be subclassed to add application-specific fields while maintaining
    compatibility with the PyAgenity framework. All internal execution metadata
    is preserved through subclassing.

    Notes:
    - `execution_meta` contains internal-only execution progress and interrupt info.
    - Users may subclass `AgentState` to add application fields; internal exec meta remains
      available to the runtime and will be persisted with the state.
    - When subclassing, add your fields but keep the core fields intact.

    Example:
        class MyCustomState(AgentState):
            user_data: dict = Field(default_factory=dict)
            custom_field: str = "default"
    """

    context: Annotated[list[Message], add_messages] = Field(default_factory=list)
    context_summary: str | None = None
    # Internal execution metadata (kept private-ish but accessible to runtime)
    execution_meta: ExecMeta = Field(default_factory=lambda: ExecMeta(current_node=START))

    # Convenience delegation methods for execution meta so callers can use the same API
    def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
        """
        Set an interrupt in the execution metadata.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status: Execution status to set.
            data (dict | None): Optional additional interrupt data.
        """
        logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
        self.execution_meta.set_interrupt(node, reason, status, data)

    def clear_interrupt(self) -> None:
        """
        Clear any interrupt in the execution metadata.
        """
        logger.debug("Clearing interrupt")
        self.execution_meta.clear_interrupt()

    def is_running(self) -> bool:
        """
        Check if the agent state is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.execution_meta.is_running()
        logger.debug("State is_running: %s", running)
        return running

    def is_interrupted(self) -> bool:
        """
        Check if the agent state is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.execution_meta.is_interrupted()
        logger.debug("State is_interrupted: %s", interrupted)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance the execution step in the metadata.
        """
        old_step = self.execution_meta.step
        self.execution_meta.advance_step()
        logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)

    def set_current_node(self, node: str) -> None:
        """
        Set the current node in the execution metadata.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.execution_meta.current_node
        self.execution_meta.set_current_node(node)
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark the agent state as completed.
        """
        logger.info("Marking state as completed")
        self.execution_meta.complete()

    def error(self, error_msg: str) -> None:
        """
        Mark the agent state as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Setting state error: %s", error_msg)
        self.execution_meta.error(error_msg)

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for the agent state.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.execution_meta.is_stopped_requested()
        logger.debug("State is_stopped_requested: %s", stopped)
        return stopped
Attributes
context class-attribute instance-attribute
context = Field(default_factory=list)
context_summary class-attribute instance-attribute
context_summary = None
execution_meta class-attribute instance-attribute
execution_meta = Field(default_factory=lambda: ExecutionState(current_node=START))
Functions
advance_step
advance_step()

Advance the execution step in the metadata.

Source code in pyagenity/state/agent_state.py
90
91
92
93
94
95
96
def advance_step(self) -> None:
    """
    Advance the execution step in the metadata.
    """
    old_step = self.execution_meta.step
    self.execution_meta.advance_step()
    logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)
clear_interrupt
clear_interrupt()

Clear any interrupt in the execution metadata.

Source code in pyagenity/state/agent_state.py
61
62
63
64
65
66
def clear_interrupt(self) -> None:
    """
    Clear any interrupt in the execution metadata.
    """
    logger.debug("Clearing interrupt")
    self.execution_meta.clear_interrupt()
complete
complete()

Mark the agent state as completed.

Source code in pyagenity/state/agent_state.py
109
110
111
112
113
114
def complete(self) -> None:
    """
    Mark the agent state as completed.
    """
    logger.info("Marking state as completed")
    self.execution_meta.complete()
error
error(error_msg)

Mark the agent state as errored.

Parameters:

Name Type Description Default
error_msg str

Error message to record.

required
Source code in pyagenity/state/agent_state.py
116
117
118
119
120
121
122
123
124
def error(self, error_msg: str) -> None:
    """
    Mark the agent state as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Setting state error: %s", error_msg)
    self.execution_meta.error(error_msg)
is_interrupted
is_interrupted()

Check if the agent state is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in pyagenity/state/agent_state.py
79
80
81
82
83
84
85
86
87
88
def is_interrupted(self) -> bool:
    """
    Check if the agent state is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.execution_meta.is_interrupted()
    logger.debug("State is_interrupted: %s", interrupted)
    return interrupted
is_running
is_running()

Check if the agent state is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in pyagenity/state/agent_state.py
68
69
70
71
72
73
74
75
76
77
def is_running(self) -> bool:
    """
    Check if the agent state is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.execution_meta.is_running()
    logger.debug("State is_running: %s", running)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for the agent state.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in pyagenity/state/agent_state.py
126
127
128
129
130
131
132
133
134
135
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for the agent state.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.execution_meta.is_stopped_requested()
    logger.debug("State is_stopped_requested: %s", stopped)
    return stopped
set_current_node
set_current_node(node)

Set the current node in the execution metadata.

Parameters:

Name Type Description Default
node str

Node to set as current.

required
Source code in pyagenity/state/agent_state.py
 98
 99
100
101
102
103
104
105
106
107
def set_current_node(self, node: str) -> None:
    """
    Set the current node in the execution metadata.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.execution_meta.current_node
    self.execution_meta.set_current_node(node)
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set an interrupt in the execution metadata.

Parameters:

Name Type Description Default
node str

Node where the interrupt occurred.

required
reason str

Reason for the interrupt.

required
status

Execution status to set.

required
data dict | None

Optional additional interrupt data.

None
Source code in pyagenity/state/agent_state.py
48
49
50
51
52
53
54
55
56
57
58
59
def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
    """
    Set an interrupt in the execution metadata.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status: Execution status to set.
        data (dict | None): Optional additional interrupt data.
    """
    logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
    self.execution_meta.set_interrupt(node, reason, status, data)

Functions

base_context

Abstract base class for context management in PyAgenity agent graphs.

This module provides BaseContextManager, which defines the interface for trimming and managing message context in agent state objects.

Classes:

Name Description
BaseContextManager

Abstract base class for context management in AI interactions.

Attributes:

Name Type Description
S
logger

Attributes

S module-attribute
S = TypeVar('S', bound=AgentState)
logger module-attribute
logger = getLogger(__name__)

Classes

BaseContextManager

Bases: ABC

Abstract base class for context management in AI interactions.

Subclasses should implement trim_context as either a synchronous or asynchronous method. Generic over AgentState or its subclasses.

Methods:

Name Description
atrim_context

Trim context based on message count asynchronously.

trim_context

Trim context based on message count. Can be sync or async.

Source code in pyagenity/state/base_context.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BaseContextManager[S](ABC):
    """
    Abstract base class for context management in AI interactions.

    Subclasses should implement `trim_context` as either a synchronous or asynchronous method.
    Generic over AgentState or its subclasses.
    """

    @abstractmethod
    def trim_context(self, state: S) -> S:
        """
        Trim context based on message count. Can be sync or async.

        Subclasses may implement as either a synchronous or asynchronous method.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context, either directly or as an awaitable.
        """
        raise NotImplementedError("Subclasses must implement this method (sync or async)")

    @abstractmethod
    async def atrim_context(self, state: S) -> S:
        """
        Trim context based on message count asynchronously.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context.
        """
        raise NotImplementedError("Subclasses must implement this method")
Functions
atrim_context abstractmethod async
atrim_context(state)

Trim context based on message count asynchronously.

Parameters:

Name Type Description Default
state S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context.

Source code in pyagenity/state/base_context.py
43
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
async def atrim_context(self, state: S) -> S:
    """
    Trim context based on message count asynchronously.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context.
    """
    raise NotImplementedError("Subclasses must implement this method")
trim_context abstractmethod
trim_context(state)

Trim context based on message count. Can be sync or async.

Subclasses may implement as either a synchronous or asynchronous method.

Parameters:

Name Type Description Default
state S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context, either directly or as an awaitable.

Source code in pyagenity/state/base_context.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@abstractmethod
def trim_context(self, state: S) -> S:
    """
    Trim context based on message count. Can be sync or async.

    Subclasses may implement as either a synchronous or asynchronous method.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context, either directly or as an awaitable.
    """
    raise NotImplementedError("Subclasses must implement this method (sync or async)")

execution_state

Execution state management for graph execution in PyAgenity.

This module provides the ExecutionState class and related enums to track progress, interruptions, and pause/resume functionality for agent graph execution.

Classes:

Name Description
ExecutionState

Tracks the internal execution state of a graph.

ExecutionStatus

Status of graph execution.

StopRequestStatus

Status of graph execution.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

ExecutionState

Bases: BaseModel

Tracks the internal execution state of a graph.

This class manages the execution progress, interrupt status, and internal data that should not be exposed to users.

Methods:

Name Description
advance_step

Advance to the next execution step.

clear_interrupt

Clear the interrupt state and resume execution.

complete

Mark execution as completed.

error

Mark execution as errored.

from_dict

Create an ExecutionState instance from a dictionary.

is_interrupted

Check if execution is currently interrupted.

is_running

Check if execution is currently running.

is_stopped_requested

Check if a stop has been requested for execution.

set_current_node

Update the current node in execution state.

set_interrupt

Set the interrupt state for execution.

Attributes:

Name Type Description
current_node str
internal_data dict[str, Any]
interrupt_data dict[str, Any] | None
interrupt_reason str | None
interrupted_node str | None
status ExecutionStatus
step int
stop_current_execution StopRequestStatus
thread_id str | None
Source code in pyagenity/state/execution_state.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class ExecutionState(BaseModel):
    """
    Tracks the internal execution state of a graph.

    This class manages the execution progress, interrupt status, and internal
    data that should not be exposed to users.
    """

    # Core execution tracking
    current_node: str
    step: int = 0
    status: ExecutionStatus = ExecutionStatus.RUNNING

    # Interrupt management
    interrupted_node: str | None = None
    interrupt_reason: str | None = None
    interrupt_data: dict[str, Any] | None = None

    # Thread/session identification
    thread_id: str | None = None

    # Stop Current Execution Flag
    stop_current_execution: StopRequestStatus = StopRequestStatus.NONE

    # Internal execution data (hidden from user)
    internal_data: dict[str, Any] = Field(default_factory=dict)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
        """
        Create an ExecutionState instance from a dictionary.

        Args:
            data (dict[str, Any]): Dictionary containing execution state fields.

        Returns:
            ExecutionState: The deserialized execution state object.
        """
        return cls.model_validate(
            {
                "current_node": data["current_node"],
                "step": data.get("step", 0),
                "status": ExecutionStatus(data.get("status", "running")),
                "interrupted_node": data.get("interrupted_node"),
                "interrupt_reason": data.get("interrupt_reason"),
                "interrupt_data": data.get("interrupt_data"),
                "thread_id": data.get("thread_id"),
                "internal_data": data.get("_internal_data", {}),
            }
        )

    def set_interrupt(
        self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
    ) -> None:
        """
        Set the interrupt state for execution.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status (ExecutionStatus): Status to set for the interrupt.
            data (dict[str, Any] | None): Optional additional interrupt data.
        """
        logger.debug(
            "Setting interrupt: node='%s', reason='%s', status='%s'",
            node,
            reason,
            status.value,
        )
        self.interrupted_node = node
        self.interrupt_reason = reason
        self.status = status
        self.interrupt_data = data

    def clear_interrupt(self) -> None:
        """
        Clear the interrupt state and resume execution.
        """
        logger.debug("Clearing interrupt, resuming execution")
        self.interrupted_node = None
        self.interrupt_reason = None
        self.interrupt_data = None
        self.status = ExecutionStatus.RUNNING

    def is_interrupted(self) -> bool:
        """
        Check if execution is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.status in [
            ExecutionStatus.INTERRUPTED_BEFORE,
            ExecutionStatus.INTERRUPTED_AFTER,
        ]
        logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance to the next execution step.
        """
        old_step = self.step
        self.step += 1
        logger.debug("Advanced step from %d to %d", old_step, self.step)

    def set_current_node(self, node: str) -> None:
        """
        Update the current node in execution state.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.current_node
        self.current_node = node
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark execution as completed.
        """
        logger.info("Marking execution as completed")
        self.status = ExecutionStatus.COMPLETED

    def error(self, error_msg: str) -> None:
        """
        Mark execution as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Marking execution as errored: %s", error_msg)
        self.status = ExecutionStatus.ERROR
        self.internal_data["error"] = error_msg

    def is_running(self) -> bool:
        """
        Check if execution is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.status == ExecutionStatus.RUNNING
        logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
        return running

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for execution.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
        logger.debug(
            "Execution is_stopped_requested: %s (stop_current_execution: %s)",
            stopped,
            self.stop_current_execution.value,
        )
        return stopped
Attributes
current_node instance-attribute
current_node
internal_data class-attribute instance-attribute
internal_data = Field(default_factory=dict)
interrupt_data class-attribute instance-attribute
interrupt_data = None
interrupt_reason class-attribute instance-attribute
interrupt_reason = None
interrupted_node class-attribute instance-attribute
interrupted_node = None
status class-attribute instance-attribute
status = RUNNING
step class-attribute instance-attribute
step = 0
stop_current_execution class-attribute instance-attribute
stop_current_execution = NONE
thread_id class-attribute instance-attribute
thread_id = None
Functions
advance_step
advance_step()

Advance to the next execution step.

Source code in pyagenity/state/execution_state.py
134
135
136
137
138
139
140
def advance_step(self) -> None:
    """
    Advance to the next execution step.
    """
    old_step = self.step
    self.step += 1
    logger.debug("Advanced step from %d to %d", old_step, self.step)
clear_interrupt
clear_interrupt()

Clear the interrupt state and resume execution.

Source code in pyagenity/state/execution_state.py
110
111
112
113
114
115
116
117
118
def clear_interrupt(self) -> None:
    """
    Clear the interrupt state and resume execution.
    """
    logger.debug("Clearing interrupt, resuming execution")
    self.interrupted_node = None
    self.interrupt_reason = None
    self.interrupt_data = None
    self.status = ExecutionStatus.RUNNING
complete
complete()

Mark execution as completed.

Source code in pyagenity/state/execution_state.py
153
154
155
156
157
158
def complete(self) -> None:
    """
    Mark execution as completed.
    """
    logger.info("Marking execution as completed")
    self.status = ExecutionStatus.COMPLETED
error
error(error_msg)

Mark execution as errored.

Parameters:

Name Type Description Default
error_msg str

Error message to record.

required
Source code in pyagenity/state/execution_state.py
160
161
162
163
164
165
166
167
168
169
def error(self, error_msg: str) -> None:
    """
    Mark execution as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Marking execution as errored: %s", error_msg)
    self.status = ExecutionStatus.ERROR
    self.internal_data["error"] = error_msg
from_dict classmethod
from_dict(data)

Create an ExecutionState instance from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing execution state fields.

required

Returns:

Name Type Description
ExecutionState ExecutionState

The deserialized execution state object.

Source code in pyagenity/state/execution_state.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
    """
    Create an ExecutionState instance from a dictionary.

    Args:
        data (dict[str, Any]): Dictionary containing execution state fields.

    Returns:
        ExecutionState: The deserialized execution state object.
    """
    return cls.model_validate(
        {
            "current_node": data["current_node"],
            "step": data.get("step", 0),
            "status": ExecutionStatus(data.get("status", "running")),
            "interrupted_node": data.get("interrupted_node"),
            "interrupt_reason": data.get("interrupt_reason"),
            "interrupt_data": data.get("interrupt_data"),
            "thread_id": data.get("thread_id"),
            "internal_data": data.get("_internal_data", {}),
        }
    )
is_interrupted
is_interrupted()

Check if execution is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in pyagenity/state/execution_state.py
120
121
122
123
124
125
126
127
128
129
130
131
132
def is_interrupted(self) -> bool:
    """
    Check if execution is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.status in [
        ExecutionStatus.INTERRUPTED_BEFORE,
        ExecutionStatus.INTERRUPTED_AFTER,
    ]
    logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
    return interrupted
is_running
is_running()

Check if execution is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in pyagenity/state/execution_state.py
171
172
173
174
175
176
177
178
179
180
def is_running(self) -> bool:
    """
    Check if execution is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.status == ExecutionStatus.RUNNING
    logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for execution.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in pyagenity/state/execution_state.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for execution.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
    logger.debug(
        "Execution is_stopped_requested: %s (stop_current_execution: %s)",
        stopped,
        self.stop_current_execution.value,
    )
    return stopped
set_current_node
set_current_node(node)

Update the current node in execution state.

Parameters:

Name Type Description Default
node str

Node to set as current.

required
Source code in pyagenity/state/execution_state.py
142
143
144
145
146
147
148
149
150
151
def set_current_node(self, node: str) -> None:
    """
    Update the current node in execution state.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.current_node
    self.current_node = node
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set the interrupt state for execution.

Parameters:

Name Type Description Default
node str

Node where the interrupt occurred.

required
reason str

Reason for the interrupt.

required
status ExecutionStatus

Status to set for the interrupt.

required
data dict[str, Any] | None

Optional additional interrupt data.

None
Source code in pyagenity/state/execution_state.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def set_interrupt(
    self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
) -> None:
    """
    Set the interrupt state for execution.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status (ExecutionStatus): Status to set for the interrupt.
        data (dict[str, Any] | None): Optional additional interrupt data.
    """
    logger.debug(
        "Setting interrupt: node='%s', reason='%s', status='%s'",
        node,
        reason,
        status.value,
    )
    self.interrupted_node = node
    self.interrupt_reason = reason
    self.status = status
    self.interrupt_data = data
ExecutionStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
COMPLETED
ERROR
INTERRUPTED_AFTER
INTERRUPTED_BEFORE
RUNNING
Source code in pyagenity/state/execution_state.py
18
19
20
21
22
23
24
25
class ExecutionStatus(Enum):
    """Status of graph execution."""

    RUNNING = "running"
    INTERRUPTED_BEFORE = "interrupted_before"
    INTERRUPTED_AFTER = "interrupted_after"
    COMPLETED = "completed"
    ERROR = "error"
Attributes
COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED_AFTER class-attribute instance-attribute
INTERRUPTED_AFTER = 'interrupted_after'
INTERRUPTED_BEFORE class-attribute instance-attribute
INTERRUPTED_BEFORE = 'interrupted_before'
RUNNING class-attribute instance-attribute
RUNNING = 'running'
StopRequestStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
NONE
STOPPED
STOP_REQUESTED
Source code in pyagenity/state/execution_state.py
28
29
30
31
32
33
class StopRequestStatus(Enum):
    """Status of graph execution."""

    NONE = "none"
    STOP_REQUESTED = "stop_requested"
    STOPPED = "stopped"
Attributes
NONE class-attribute instance-attribute
NONE = 'none'
STOPPED class-attribute instance-attribute
STOPPED = 'stopped'
STOP_REQUESTED class-attribute instance-attribute
STOP_REQUESTED = 'stop_requested'

message_context_manager

Message context management for agent state in PyAgenity.

This module provides MessageContextManager, which trims and manages the message history (context) for agent interactions, ensuring efficient context window usage.

Classes:

Name Description
MessageContextManager

Manages the context field for AI interactions.

Attributes:

Name Type Description
S
logger

Attributes

S module-attribute
S = TypeVar('S', bound=AgentState)
logger module-attribute
logger = getLogger(__name__)

Classes

MessageContextManager

Bases: BaseContextManager[S]

Manages the context field for AI interactions.

This class trims the context (message history) based on a maximum number of user messages, ensuring the first message (usually a system prompt) is always preserved. Generic over AgentState or its subclasses.

Methods:

Name Description
__init__

Initialize the MessageContextManager.

atrim_context

Asynchronous version of trim_context.

trim_context

Trim the context in the given AgentState based on the maximum number of user messages.

Attributes:

Name Type Description
max_messages
Source code in pyagenity/state/message_context_manager.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class MessageContextManager(BaseContextManager[S]):
    """
    Manages the context field for AI interactions.

    This class trims the context (message history) based on a maximum number of user messages,
    ensuring the first message (usually a system prompt) is always preserved.
    Generic over AgentState or its subclasses.
    """

    def __init__(self, max_messages: int = 10) -> None:
        """
        Initialize the MessageContextManager.

        Args:
            max_messages (int): Maximum number of
                user messages to keep in context. Default is 10.
        """
        self.max_messages = max_messages
        logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)

    def _trim(self, messages: list[Message]) -> list[Message] | None:
        """
        Trim messages keeping system messages and most recent user messages.

        Returns None if no trimming is needed, otherwise returns the trimmed list.
        """
        # check context is empty
        if not messages:
            logger.debug("No messages to trim; context is empty")
            return None

        # Count user messages
        user_message_count = sum(1 for msg in messages if msg.role == "user")

        if user_message_count <= self.max_messages:
            # no trimming needed
            logger.debug(
                "No trimming needed; context is within limits (%d user messages)",
                user_message_count,
            )
            return None

        # Separate system messages (usually at the beginning)
        system_messages = [msg for msg in messages if msg.role == "system"]
        non_system_messages = [msg for msg in messages if msg.role != "system"]

        # Keep only the most recent messages that include max_messages user messages
        final_non_system = []
        user_count = 0

        # Iterate from the end to keep most recent messages
        for msg in reversed(non_system_messages):
            if msg.role == "user":
                if user_count >= self.max_messages:
                    break
                user_count += 1
            final_non_system.insert(0, msg)  # Insert at beginning to maintain order

        # Combine system messages (at start) with trimmed conversation
        trimmed_messages = system_messages + final_non_system

        logger.debug(
            "Trimmed from %d to %d messages (%d user messages kept)",
            len(messages),
            len(trimmed_messages),
            user_count,
        )

        return trimmed_messages

    def trim_context(self, state: S) -> S:
        """
        Trim the context in the given AgentState based on the maximum number of user messages.

        The first message (typically a system prompt) is always preserved. Only the most recent
        user messages up to `max_messages` are kept, along with the first message.

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state

    async def atrim_context(self, state: S) -> S:
        """
        Asynchronous version of trim_context.

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state
Attributes
max_messages instance-attribute
max_messages = max_messages
Functions
__init__
__init__(max_messages=10)

Initialize the MessageContextManager.

Parameters:

Name Type Description Default
max_messages int

Maximum number of user messages to keep in context. Default is 10.

10
Source code in pyagenity/state/message_context_manager.py
31
32
33
34
35
36
37
38
39
40
def __init__(self, max_messages: int = 10) -> None:
    """
    Initialize the MessageContextManager.

    Args:
        max_messages (int): Maximum number of
            user messages to keep in context. Default is 10.
    """
    self.max_messages = max_messages
    logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)
atrim_context async
atrim_context(state)

Asynchronous version of trim_context.

Parameters:

Name Type Description Default
state AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in pyagenity/state/message_context_manager.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def atrim_context(self, state: S) -> S:
    """
    Asynchronous version of trim_context.

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state
trim_context
trim_context(state)

Trim the context in the given AgentState based on the maximum number of user messages.

The first message (typically a system prompt) is always preserved. Only the most recent user messages up to max_messages are kept, along with the first message.

Parameters:

Name Type Description Default
state AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in pyagenity/state/message_context_manager.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def trim_context(self, state: S) -> S:
    """
    Trim the context in the given AgentState based on the maximum number of user messages.

    The first message (typically a system prompt) is always preserved. Only the most recent
    user messages up to `max_messages` are kept, along with the first message.

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state