Skip to content

Execution state

Execution state management for graph execution in PyAgenity.

This module provides the ExecutionState class and related enums to track progress, interruptions, and pause/resume functionality for agent graph execution.

Classes:

Name Description
ExecutionState

Tracks the internal execution state of a graph.

ExecutionStatus

Status of graph execution.

StopRequestStatus

Status of graph execution.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

ExecutionState

Bases: BaseModel

Tracks the internal execution state of a graph.

This class manages the execution progress, interrupt status, and internal data that should not be exposed to users.

Methods:

Name Description
advance_step

Advance to the next execution step.

clear_interrupt

Clear the interrupt state and resume execution.

complete

Mark execution as completed.

error

Mark execution as errored.

from_dict

Create an ExecutionState instance from a dictionary.

is_interrupted

Check if execution is currently interrupted.

is_running

Check if execution is currently running.

is_stopped_requested

Check if a stop has been requested for execution.

set_current_node

Update the current node in execution state.

set_interrupt

Set the interrupt state for execution.

Attributes:

Name Type Description
current_node str
internal_data dict[str, Any]
interrupt_data dict[str, Any] | None
interrupt_reason str | None
interrupted_node str | None
status ExecutionStatus
step int
stop_current_execution StopRequestStatus
thread_id str | None
Source code in pyagenity/state/execution_state.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class ExecutionState(BaseModel):
    """
    Tracks the internal execution state of a graph.

    This class manages the execution progress, interrupt status, and internal
    data that should not be exposed to users.
    """

    # Core execution tracking
    current_node: str
    step: int = 0
    status: ExecutionStatus = ExecutionStatus.RUNNING

    # Interrupt management
    interrupted_node: str | None = None
    interrupt_reason: str | None = None
    interrupt_data: dict[str, Any] | None = None

    # Thread/session identification
    thread_id: str | None = None

    # Stop Current Execution Flag
    stop_current_execution: StopRequestStatus = StopRequestStatus.NONE

    # Internal execution data (hidden from user)
    internal_data: dict[str, Any] = Field(default_factory=dict)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
        """
        Create an ExecutionState instance from a dictionary.

        Args:
            data (dict[str, Any]): Dictionary containing execution state fields.

        Returns:
            ExecutionState: The deserialized execution state object.
        """
        return cls.model_validate(
            {
                "current_node": data["current_node"],
                "step": data.get("step", 0),
                "status": ExecutionStatus(data.get("status", "running")),
                "interrupted_node": data.get("interrupted_node"),
                "interrupt_reason": data.get("interrupt_reason"),
                "interrupt_data": data.get("interrupt_data"),
                "thread_id": data.get("thread_id"),
                "internal_data": data.get("_internal_data", {}),
            }
        )

    def set_interrupt(
        self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
    ) -> None:
        """
        Set the interrupt state for execution.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status (ExecutionStatus): Status to set for the interrupt.
            data (dict[str, Any] | None): Optional additional interrupt data.
        """
        logger.debug(
            "Setting interrupt: node='%s', reason='%s', status='%s'",
            node,
            reason,
            status.value,
        )
        self.interrupted_node = node
        self.interrupt_reason = reason
        self.status = status
        self.interrupt_data = data

    def clear_interrupt(self) -> None:
        """
        Clear the interrupt state and resume execution.
        """
        logger.debug("Clearing interrupt, resuming execution")
        self.interrupted_node = None
        self.interrupt_reason = None
        self.interrupt_data = None
        self.status = ExecutionStatus.RUNNING

    def is_interrupted(self) -> bool:
        """
        Check if execution is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.status in [
            ExecutionStatus.INTERRUPTED_BEFORE,
            ExecutionStatus.INTERRUPTED_AFTER,
        ]
        logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance to the next execution step.
        """
        old_step = self.step
        self.step += 1
        logger.debug("Advanced step from %d to %d", old_step, self.step)

    def set_current_node(self, node: str) -> None:
        """
        Update the current node in execution state.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.current_node
        self.current_node = node
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark execution as completed.
        """
        logger.info("Marking execution as completed")
        self.status = ExecutionStatus.COMPLETED

    def error(self, error_msg: str) -> None:
        """
        Mark execution as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Marking execution as errored: %s", error_msg)
        self.status = ExecutionStatus.ERROR
        self.internal_data["error"] = error_msg

    def is_running(self) -> bool:
        """
        Check if execution is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.status == ExecutionStatus.RUNNING
        logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
        return running

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for execution.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
        logger.debug(
            "Execution is_stopped_requested: %s (stop_current_execution: %s)",
            stopped,
            self.stop_current_execution.value,
        )
        return stopped

Attributes

current_node instance-attribute
current_node
internal_data class-attribute instance-attribute
internal_data = Field(default_factory=dict)
interrupt_data class-attribute instance-attribute
interrupt_data = None
interrupt_reason class-attribute instance-attribute
interrupt_reason = None
interrupted_node class-attribute instance-attribute
interrupted_node = None
status class-attribute instance-attribute
status = RUNNING
step class-attribute instance-attribute
step = 0
stop_current_execution class-attribute instance-attribute
stop_current_execution = NONE
thread_id class-attribute instance-attribute
thread_id = None

Functions

advance_step
advance_step()

Advance to the next execution step.

Source code in pyagenity/state/execution_state.py
134
135
136
137
138
139
140
def advance_step(self) -> None:
    """
    Advance to the next execution step.
    """
    old_step = self.step
    self.step += 1
    logger.debug("Advanced step from %d to %d", old_step, self.step)
clear_interrupt
clear_interrupt()

Clear the interrupt state and resume execution.

Source code in pyagenity/state/execution_state.py
110
111
112
113
114
115
116
117
118
def clear_interrupt(self) -> None:
    """
    Clear the interrupt state and resume execution.
    """
    logger.debug("Clearing interrupt, resuming execution")
    self.interrupted_node = None
    self.interrupt_reason = None
    self.interrupt_data = None
    self.status = ExecutionStatus.RUNNING
complete
complete()

Mark execution as completed.

Source code in pyagenity/state/execution_state.py
153
154
155
156
157
158
def complete(self) -> None:
    """
    Mark execution as completed.
    """
    logger.info("Marking execution as completed")
    self.status = ExecutionStatus.COMPLETED
error
error(error_msg)

Mark execution as errored.

Parameters:

Name Type Description Default
error_msg
str

Error message to record.

required
Source code in pyagenity/state/execution_state.py
160
161
162
163
164
165
166
167
168
169
def error(self, error_msg: str) -> None:
    """
    Mark execution as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Marking execution as errored: %s", error_msg)
    self.status = ExecutionStatus.ERROR
    self.internal_data["error"] = error_msg
from_dict classmethod
from_dict(data)

Create an ExecutionState instance from a dictionary.

Parameters:

Name Type Description Default
data
dict[str, Any]

Dictionary containing execution state fields.

required

Returns:

Name Type Description
ExecutionState ExecutionState

The deserialized execution state object.

Source code in pyagenity/state/execution_state.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
    """
    Create an ExecutionState instance from a dictionary.

    Args:
        data (dict[str, Any]): Dictionary containing execution state fields.

    Returns:
        ExecutionState: The deserialized execution state object.
    """
    return cls.model_validate(
        {
            "current_node": data["current_node"],
            "step": data.get("step", 0),
            "status": ExecutionStatus(data.get("status", "running")),
            "interrupted_node": data.get("interrupted_node"),
            "interrupt_reason": data.get("interrupt_reason"),
            "interrupt_data": data.get("interrupt_data"),
            "thread_id": data.get("thread_id"),
            "internal_data": data.get("_internal_data", {}),
        }
    )
is_interrupted
is_interrupted()

Check if execution is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in pyagenity/state/execution_state.py
120
121
122
123
124
125
126
127
128
129
130
131
132
def is_interrupted(self) -> bool:
    """
    Check if execution is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.status in [
        ExecutionStatus.INTERRUPTED_BEFORE,
        ExecutionStatus.INTERRUPTED_AFTER,
    ]
    logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
    return interrupted
is_running
is_running()

Check if execution is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in pyagenity/state/execution_state.py
171
172
173
174
175
176
177
178
179
180
def is_running(self) -> bool:
    """
    Check if execution is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.status == ExecutionStatus.RUNNING
    logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for execution.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in pyagenity/state/execution_state.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for execution.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
    logger.debug(
        "Execution is_stopped_requested: %s (stop_current_execution: %s)",
        stopped,
        self.stop_current_execution.value,
    )
    return stopped
set_current_node
set_current_node(node)

Update the current node in execution state.

Parameters:

Name Type Description Default
node
str

Node to set as current.

required
Source code in pyagenity/state/execution_state.py
142
143
144
145
146
147
148
149
150
151
def set_current_node(self, node: str) -> None:
    """
    Update the current node in execution state.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.current_node
    self.current_node = node
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set the interrupt state for execution.

Parameters:

Name Type Description Default
node
str

Node where the interrupt occurred.

required
reason
str

Reason for the interrupt.

required
status
ExecutionStatus

Status to set for the interrupt.

required
data
dict[str, Any] | None

Optional additional interrupt data.

None
Source code in pyagenity/state/execution_state.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def set_interrupt(
    self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
) -> None:
    """
    Set the interrupt state for execution.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status (ExecutionStatus): Status to set for the interrupt.
        data (dict[str, Any] | None): Optional additional interrupt data.
    """
    logger.debug(
        "Setting interrupt: node='%s', reason='%s', status='%s'",
        node,
        reason,
        status.value,
    )
    self.interrupted_node = node
    self.interrupt_reason = reason
    self.status = status
    self.interrupt_data = data

ExecutionStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
COMPLETED
ERROR
INTERRUPTED_AFTER
INTERRUPTED_BEFORE
RUNNING
Source code in pyagenity/state/execution_state.py
18
19
20
21
22
23
24
25
class ExecutionStatus(Enum):
    """Status of graph execution."""

    RUNNING = "running"
    INTERRUPTED_BEFORE = "interrupted_before"
    INTERRUPTED_AFTER = "interrupted_after"
    COMPLETED = "completed"
    ERROR = "error"

Attributes

COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED_AFTER class-attribute instance-attribute
INTERRUPTED_AFTER = 'interrupted_after'
INTERRUPTED_BEFORE class-attribute instance-attribute
INTERRUPTED_BEFORE = 'interrupted_before'
RUNNING class-attribute instance-attribute
RUNNING = 'running'

StopRequestStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
NONE
STOPPED
STOP_REQUESTED
Source code in pyagenity/state/execution_state.py
28
29
30
31
32
33
class StopRequestStatus(Enum):
    """Status of graph execution."""

    NONE = "none"
    STOP_REQUESTED = "stop_requested"
    STOPPED = "stopped"

Attributes

NONE class-attribute instance-attribute
NONE = 'none'
STOPPED class-attribute instance-attribute
STOPPED = 'stopped'
STOP_REQUESTED class-attribute instance-attribute
STOP_REQUESTED = 'stop_requested'