Skip to content

State

State management for TAF agent graphs.

This package provides schemas and context managers for agent state, execution tracking, and message context management. All core state classes are exported for use in agent workflows and custom state extensions.

Modules:

Name Description
agent_state

Agent state schema for TAF agent graphs.

base_context

Abstract base class for context management in TAF agent graphs.

execution_state

Execution state management for graph execution in TAF.

message

Message and content block primitives for agent graphs.

message_block
message_context_manager

Message context management for agent state in TAF.

reducers

Reducer utilities for merging and replacing lists and values in agent state.

stream_chunks

Stream chunk primitives for unified streaming data handling.

Classes:

Name Description
AgentState

Common state schema that includes messages, context and internal execution metadata.

AnnotationBlock

Annotation content block for messages.

AnnotationRef

Reference to annotation metadata (e.g., citation, note).

AudioBlock

Audio content block for messages.

BaseContextManager

Abstract base class for context management in AI interactions.

DataBlock

Data content block for messages.

DocumentBlock

Document content block for messages.

ErrorBlock

Error content block for messages.

ExecutionState

Tracks the internal execution state of a graph.

ExecutionStatus

Status of graph execution.

ImageBlock

Image content block for messages.

MediaRef

Reference to media content (image/audio/video/document/data).

Message

Represents a message in a conversation, including content, role, metadata, and token usage.

MessageContextManager

Manages the context field for AI interactions.

ReasoningBlock

Reasoning content block for messages.

StreamChunk

Unified wrapper for different types of streaming data.

StreamEvent
TextBlock

Text content block for messages.

TokenUsages

Tracks token usage statistics for a message or model response.

ToolCallBlock

Tool call content block for messages.

ToolResultBlock

Tool result content block for messages.

VideoBlock

Video content block for messages.

Functions:

Name Description
add_messages

Adds messages to the list, avoiding duplicates by message_id.

append_items

Appends items to a list, avoiding duplicates by item.id.

remove_tool_messages

Remove COMPLETED tool interaction sequences from the message list.

replace_messages

Replaces the entire message list with a new one.

replace_value

Replaces a value with another.

Attributes:

Name Type Description
ContentBlock

Attributes

ContentBlock module-attribute

ContentBlock = Annotated[Union[TextBlock, ImageBlock, AudioBlock, VideoBlock, DocumentBlock, DataBlock, ToolCallBlock, RemoteToolCallBlock, ToolResultBlock, ReasoningBlock, AnnotationBlock, ErrorBlock], Field(discriminator='type')]

__all__ module-attribute

__all__ = ['AgentState', 'AnnotationBlock', 'AnnotationRef', 'AudioBlock', 'BaseContextManager', 'ContentBlock', 'DataBlock', 'DocumentBlock', 'ErrorBlock', 'ExecutionState', 'ExecutionStatus', 'ImageBlock', 'MediaRef', 'Message', 'MessageContextManager', 'ReasoningBlock', 'StreamChunk', 'StreamEvent', 'TextBlock', 'TextBlock', 'TokenUsages', 'ToolCallBlock', 'ToolResultBlock', 'VideoBlock', 'add_messages', 'append_items', 'remove_tool_messages', 'replace_messages', 'replace_value']

Classes

AgentState

Bases: BaseModel

Common state schema that includes messages, context and internal execution metadata.

This class can be subclassed to add application-specific fields while maintaining compatibility with the TAF framework. All internal execution metadata is preserved through subclassing.

Notes: - execution_meta contains internal-only execution progress and interrupt info. - Users may subclass AgentState to add application fields; internal exec meta remains available to the runtime and will be persisted with the state. - When subclassing, add your fields but keep the core fields intact.

Example

class MyCustomState(AgentState): user_data: dict = Field(default_factory=dict) custom_field: str = "default"

Methods:

Name Description
advance_step

Advance the execution step in the metadata.

clear_interrupt

Clear any interrupt in the execution metadata.

complete

Mark the agent state as completed.

error

Mark the agent state as errored.

is_interrupted

Check if the agent state is currently interrupted.

is_running

Check if the agent state is currently running.

is_stopped_requested

Check if a stop has been requested for the agent state.

set_current_node

Set the current node in the execution metadata.

set_interrupt

Set an interrupt in the execution metadata.

Attributes:

Name Type Description
context Annotated[list[Message], add_messages]
context_summary str | None
execution_meta ExecutionState
Source code in agentflow/state/agent_state.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class AgentState(BaseModel):
    """Common state schema that includes messages, context and internal execution metadata.

    This class can be subclassed to add application-specific fields while maintaining
    compatibility with the TAF framework. All internal execution metadata
    is preserved through subclassing.

    Notes:
    - `execution_meta` contains internal-only execution progress and interrupt info.
    - Users may subclass `AgentState` to add application fields; internal exec meta remains
      available to the runtime and will be persisted with the state.
    - When subclassing, add your fields but keep the core fields intact.

    Example:
        class MyCustomState(AgentState):
            user_data: dict = Field(default_factory=dict)
            custom_field: str = "default"
    """

    context: Annotated[list[Message], add_messages] = Field(default_factory=list)
    context_summary: str | None = None
    # Internal execution metadata (kept private-ish but accessible to runtime)
    execution_meta: ExecMeta = Field(default_factory=lambda: ExecMeta(current_node=START))

    # Convenience delegation methods for execution meta so callers can use the same API
    def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
        """
        Set an interrupt in the execution metadata.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status: Execution status to set.
            data (dict | None): Optional additional interrupt data.
        """
        logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
        self.execution_meta.set_interrupt(node, reason, status, data)

    def clear_interrupt(self) -> None:
        """
        Clear any interrupt in the execution metadata.
        """
        logger.debug("Clearing interrupt")
        self.execution_meta.clear_interrupt()

    def is_running(self) -> bool:
        """
        Check if the agent state is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.execution_meta.is_running()
        logger.debug("State is_running: %s", running)
        return running

    def is_interrupted(self) -> bool:
        """
        Check if the agent state is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.execution_meta.is_interrupted()
        logger.debug("State is_interrupted: %s", interrupted)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance the execution step in the metadata.
        """
        old_step = self.execution_meta.step
        self.execution_meta.advance_step()
        logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)

    def set_current_node(self, node: str) -> None:
        """
        Set the current node in the execution metadata.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.execution_meta.current_node
        self.execution_meta.set_current_node(node)
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark the agent state as completed.
        """
        logger.info("Marking state as completed")
        self.execution_meta.complete()

    def error(self, error_msg: str) -> None:
        """
        Mark the agent state as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Setting state error: %s", error_msg)
        self.execution_meta.error(error_msg)

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for the agent state.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.execution_meta.is_stopped_requested()
        logger.debug("State is_stopped_requested: %s", stopped)
        return stopped

Attributes

context class-attribute instance-attribute
context = Field(default_factory=list)
context_summary class-attribute instance-attribute
context_summary = None
execution_meta class-attribute instance-attribute
execution_meta = Field(default_factory=lambda: ExecutionState(current_node=START))

Functions

advance_step
advance_step()

Advance the execution step in the metadata.

Source code in agentflow/state/agent_state.py
93
94
95
96
97
98
99
def advance_step(self) -> None:
    """
    Advance the execution step in the metadata.
    """
    old_step = self.execution_meta.step
    self.execution_meta.advance_step()
    logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)
clear_interrupt
clear_interrupt()

Clear any interrupt in the execution metadata.

Source code in agentflow/state/agent_state.py
64
65
66
67
68
69
def clear_interrupt(self) -> None:
    """
    Clear any interrupt in the execution metadata.
    """
    logger.debug("Clearing interrupt")
    self.execution_meta.clear_interrupt()
complete
complete()

Mark the agent state as completed.

Source code in agentflow/state/agent_state.py
112
113
114
115
116
117
def complete(self) -> None:
    """
    Mark the agent state as completed.
    """
    logger.info("Marking state as completed")
    self.execution_meta.complete()
error
error(error_msg)

Mark the agent state as errored.

Parameters:

Name Type Description Default
error_msg
str

Error message to record.

required
Source code in agentflow/state/agent_state.py
119
120
121
122
123
124
125
126
127
def error(self, error_msg: str) -> None:
    """
    Mark the agent state as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Setting state error: %s", error_msg)
    self.execution_meta.error(error_msg)
is_interrupted
is_interrupted()

Check if the agent state is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in agentflow/state/agent_state.py
82
83
84
85
86
87
88
89
90
91
def is_interrupted(self) -> bool:
    """
    Check if the agent state is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.execution_meta.is_interrupted()
    logger.debug("State is_interrupted: %s", interrupted)
    return interrupted
is_running
is_running()

Check if the agent state is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in agentflow/state/agent_state.py
71
72
73
74
75
76
77
78
79
80
def is_running(self) -> bool:
    """
    Check if the agent state is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.execution_meta.is_running()
    logger.debug("State is_running: %s", running)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for the agent state.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in agentflow/state/agent_state.py
129
130
131
132
133
134
135
136
137
138
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for the agent state.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.execution_meta.is_stopped_requested()
    logger.debug("State is_stopped_requested: %s", stopped)
    return stopped
set_current_node
set_current_node(node)

Set the current node in the execution metadata.

Parameters:

Name Type Description Default
node
str

Node to set as current.

required
Source code in agentflow/state/agent_state.py
101
102
103
104
105
106
107
108
109
110
def set_current_node(self, node: str) -> None:
    """
    Set the current node in the execution metadata.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.execution_meta.current_node
    self.execution_meta.set_current_node(node)
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set an interrupt in the execution metadata.

Parameters:

Name Type Description Default
node
str

Node where the interrupt occurred.

required
reason
str

Reason for the interrupt.

required
status

Execution status to set.

required
data
dict | None

Optional additional interrupt data.

None
Source code in agentflow/state/agent_state.py
51
52
53
54
55
56
57
58
59
60
61
62
def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
    """
    Set an interrupt in the execution metadata.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status: Execution status to set.
        data (dict | None): Optional additional interrupt data.
    """
    logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
    self.execution_meta.set_interrupt(node, reason, status, data)

AnnotationBlock

Bases: BaseModel

Annotation content block for messages.

Attributes:

Name Type Description
type Literal['annotation']

Block type discriminator.

kind Literal['citation', 'note']

Kind of annotation.

refs list[AnnotationRef]

List of annotation references.

spans list[tuple[int, int]] | None

Spans covered by the annotation.

Source code in agentflow/state/message_block.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
class AnnotationBlock(BaseModel):
    """
    Annotation content block for messages.

    Attributes:
        type (Literal["annotation"]): Block type discriminator.
        kind (Literal["citation", "note"]): Kind of annotation.
        refs (list[AnnotationRef]): List of annotation references.
        spans (list[tuple[int, int]] | None): Spans covered by the annotation.
    """

    type: Literal["annotation"] = "annotation"
    kind: Literal["citation", "note"] = "citation"
    refs: list[AnnotationRef] = Field(default_factory=list)
    spans: list[tuple[int, int]] | None = None

Attributes

kind class-attribute instance-attribute
kind = 'citation'
refs class-attribute instance-attribute
refs = Field(default_factory=list)
spans class-attribute instance-attribute
spans = None
type class-attribute instance-attribute
type = 'annotation'

AnnotationRef

Bases: BaseModel

Reference to annotation metadata (e.g., citation, note).

Attributes:

Name Type Description
url str | None

URL to annotation source.

file_id str | None

Provider-managed file ID.

page int | None

Page number (if applicable).

index int | None

Index within the annotation source.

title str | None

Title of the annotation.

Source code in agentflow/state/message_block.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class AnnotationRef(BaseModel):
    """
    Reference to annotation metadata (e.g., citation, note).

    Attributes:
        url (str | None): URL to annotation source.
        file_id (str | None): Provider-managed file ID.
        page (int | None): Page number (if applicable).
        index (int | None): Index within the annotation source.
        title (str | None): Title of the annotation.
    """

    url: str | None = None
    file_id: str | None = None
    page: int | None = None
    index: int | None = None
    title: str | None = None

Attributes

file_id class-attribute instance-attribute
file_id = None
index class-attribute instance-attribute
index = None
page class-attribute instance-attribute
page = None
title class-attribute instance-attribute
title = None
url class-attribute instance-attribute
url = None

AudioBlock

Bases: BaseModel

Audio content block for messages.

Attributes:

Name Type Description
type Literal['audio']

Block type discriminator.

media MediaRef

Reference to audio media.

transcript str | None

Transcript of audio.

sample_rate int | None

Sample rate in Hz.

channels int | None

Number of audio channels.

Source code in agentflow/state/message_block.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class AudioBlock(BaseModel):
    """
    Audio content block for messages.

    Attributes:
        type (Literal["audio"]): Block type discriminator.
        media (MediaRef): Reference to audio media.
        transcript (str | None): Transcript of audio.
        sample_rate (int | None): Sample rate in Hz.
        channels (int | None): Number of audio channels.
    """

    type: Literal["audio"] = "audio"
    media: MediaRef
    transcript: str | None = None
    sample_rate: int | None = None
    channels: int | None = None

Attributes

channels class-attribute instance-attribute
channels = None
media instance-attribute
media
sample_rate class-attribute instance-attribute
sample_rate = None
transcript class-attribute instance-attribute
transcript = None
type class-attribute instance-attribute
type = 'audio'

BaseContextManager

Bases: ABC

Abstract base class for context management in AI interactions.

Subclasses should implement trim_context as either a synchronous or asynchronous method. Generic over AgentState or its subclasses.

Methods:

Name Description
atrim_context

Trim context based on message count asynchronously.

trim_context

Trim context based on message count. Can be sync or async.

Source code in agentflow/state/base_context.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BaseContextManager[S](ABC):
    """
    Abstract base class for context management in AI interactions.

    Subclasses should implement `trim_context` as either a synchronous or asynchronous method.
    Generic over AgentState or its subclasses.
    """

    @abstractmethod
    def trim_context(self, state: S) -> S:
        """
        Trim context based on message count. Can be sync or async.

        Subclasses may implement as either a synchronous or asynchronous method.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context, either directly or as an awaitable.
        """
        raise NotImplementedError("Subclasses must implement this method (sync or async)")

    @abstractmethod
    async def atrim_context(self, state: S) -> S:
        """
        Trim context based on message count asynchronously.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context.
        """
        raise NotImplementedError("Subclasses must implement this method")

Functions

atrim_context abstractmethod async
atrim_context(state)

Trim context based on message count asynchronously.

Parameters:

Name Type Description Default
state
S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context.

Source code in agentflow/state/base_context.py
43
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
async def atrim_context(self, state: S) -> S:
    """
    Trim context based on message count asynchronously.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context.
    """
    raise NotImplementedError("Subclasses must implement this method")
trim_context abstractmethod
trim_context(state)

Trim context based on message count. Can be sync or async.

Subclasses may implement as either a synchronous or asynchronous method.

Parameters:

Name Type Description Default
state
S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context, either directly or as an awaitable.

Source code in agentflow/state/base_context.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@abstractmethod
def trim_context(self, state: S) -> S:
    """
    Trim context based on message count. Can be sync or async.

    Subclasses may implement as either a synchronous or asynchronous method.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context, either directly or as an awaitable.
    """
    raise NotImplementedError("Subclasses must implement this method (sync or async)")

DataBlock

Bases: BaseModel

Data content block for messages.

Attributes:

Name Type Description
type Literal['data']

Block type discriminator.

mime_type str

MIME type of the data.

data_base64 str | None

Base64-encoded data.

media MediaRef | None

Reference to associated media.

Source code in agentflow/state/message_block.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class DataBlock(BaseModel):
    """
    Data content block for messages.

    Attributes:
        type (Literal["data"]): Block type discriminator.
        mime_type (str): MIME type of the data.
        data_base64 (str | None): Base64-encoded data.
        media (MediaRef | None): Reference to associated media.
    """

    type: Literal["data"] = "data"
    mime_type: str
    data_base64: str | None = None
    media: MediaRef | None = None

Attributes

data_base64 class-attribute instance-attribute
data_base64 = None
media class-attribute instance-attribute
media = None
mime_type instance-attribute
mime_type
type class-attribute instance-attribute
type = 'data'

DocumentBlock

Bases: BaseModel

Document content block for messages.

Attributes:

Name Type Description
type Literal['document']

Block type discriminator.

media MediaRef

Reference to document media.

pages list[int] | None

List of page numbers.

excerpt str | None

Excerpt from the document.

Source code in agentflow/state/message_block.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class DocumentBlock(BaseModel):
    """
    Document content block for messages.

    Attributes:
        type (Literal["document"]): Block type discriminator.
        media (MediaRef): Reference to document media.
        pages (list[int] | None): List of page numbers.
        excerpt (str | None): Excerpt from the document.
    """

    type: Literal["document"] = "document"
    media: MediaRef
    pages: list[int] | None = None
    excerpt: str | None = None

Attributes

excerpt class-attribute instance-attribute
excerpt = None
media instance-attribute
media
pages class-attribute instance-attribute
pages = None
type class-attribute instance-attribute
type = 'document'

ErrorBlock

Bases: BaseModel

Error content block for messages.

Attributes:

Name Type Description
type Literal['error']

Block type discriminator.

message str

Error message.

code str | None

Error code.

data dict[str, Any] | None

Additional error data.

Source code in agentflow/state/message_block.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class ErrorBlock(BaseModel):
    """
    Error content block for messages.

    Attributes:
        type (Literal["error"]): Block type discriminator.
        message (str): Error message.
        code (str | None): Error code.
        data (dict[str, Any] | None): Additional error data.
    """

    type: Literal["error"] = "error"
    message: str
    code: str | None = None
    data: dict[str, Any] | None = None

Attributes

code class-attribute instance-attribute
code = None
data class-attribute instance-attribute
data = None
message instance-attribute
message
type class-attribute instance-attribute
type = 'error'

ExecutionState

Bases: BaseModel

Tracks the internal execution state of a graph.

This class manages the execution progress, interrupt status, and internal data that should not be exposed to users.

Methods:

Name Description
advance_step

Advance to the next execution step.

clear_interrupt

Clear the interrupt state and resume execution.

complete

Mark execution as completed.

error

Mark execution as errored.

from_dict

Create an ExecutionState instance from a dictionary.

is_interrupted

Check if execution is currently interrupted.

is_running

Check if execution is currently running.

is_stopped_requested

Check if a stop has been requested for execution.

set_current_node

Update the current node in execution state.

set_interrupt

Set the interrupt state for execution.

Attributes:

Name Type Description
current_node str
internal_data dict[str, Any]
interrupt_data dict[str, Any] | None
interrupt_reason str | None
interrupted_node str | None
status ExecutionStatus
step int
stop_current_execution StopRequestStatus
thread_id str | None
Source code in agentflow/state/execution_state.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class ExecutionState(BaseModel):
    """
    Tracks the internal execution state of a graph.

    This class manages the execution progress, interrupt status, and internal
    data that should not be exposed to users.
    """

    # Core execution tracking
    current_node: str
    step: int = 0
    status: ExecutionStatus = ExecutionStatus.RUNNING

    # Interrupt management
    interrupted_node: str | None = None
    interrupt_reason: str | None = None
    interrupt_data: dict[str, Any] | None = None

    # Thread/session identification
    thread_id: str | None = None

    # Stop Current Execution Flag
    stop_current_execution: StopRequestStatus = StopRequestStatus.NONE

    # Internal execution data (hidden from user)
    internal_data: dict[str, Any] = Field(default_factory=dict)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
        """
        Create an ExecutionState instance from a dictionary.

        Args:
            data (dict[str, Any]): Dictionary containing execution state fields.

        Returns:
            ExecutionState: The deserialized execution state object.
        """
        return cls.model_validate(
            {
                "current_node": data["current_node"],
                "step": data.get("step", 0),
                "status": ExecutionStatus(data.get("status", "running")),
                "interrupted_node": data.get("interrupted_node"),
                "interrupt_reason": data.get("interrupt_reason"),
                "interrupt_data": data.get("interrupt_data"),
                "thread_id": data.get("thread_id"),
                "internal_data": data.get("_internal_data", {}),
            }
        )

    def set_interrupt(
        self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
    ) -> None:
        """
        Set the interrupt state for execution.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status (ExecutionStatus): Status to set for the interrupt.
            data (dict[str, Any] | None): Optional additional interrupt data.
        """
        logger.debug(
            "Setting interrupt: node='%s', reason='%s', status='%s'",
            node,
            reason,
            status.value,
        )
        self.interrupted_node = node
        self.interrupt_reason = reason
        self.status = status
        self.interrupt_data = data

    def clear_interrupt(self) -> None:
        """
        Clear the interrupt state and resume execution.
        """
        logger.debug("Clearing interrupt, resuming execution")
        self.interrupted_node = None
        self.interrupt_reason = None
        self.interrupt_data = None
        self.status = ExecutionStatus.RUNNING

    def is_interrupted(self) -> bool:
        """
        Check if execution is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.status in [
            ExecutionStatus.INTERRUPTED_BEFORE,
            ExecutionStatus.INTERRUPTED_AFTER,
        ]
        logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance to the next execution step.
        """
        old_step = self.step
        self.step += 1
        logger.debug("Advanced step from %d to %d", old_step, self.step)

    def set_current_node(self, node: str) -> None:
        """
        Update the current node in execution state.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.current_node
        self.current_node = node
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark execution as completed.
        """
        logger.info("Marking execution as completed")
        self.status = ExecutionStatus.COMPLETED

    def error(self, error_msg: str) -> None:
        """
        Mark execution as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Marking execution as errored: %s", error_msg)
        self.status = ExecutionStatus.ERROR
        self.internal_data["error"] = error_msg

    def is_running(self) -> bool:
        """
        Check if execution is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.status == ExecutionStatus.RUNNING
        logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
        return running

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for execution.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
        logger.debug(
            "Execution is_stopped_requested: %s (stop_current_execution: %s)",
            stopped,
            self.stop_current_execution.value,
        )
        return stopped

Attributes

current_node instance-attribute
current_node
internal_data class-attribute instance-attribute
internal_data = Field(default_factory=dict)
interrupt_data class-attribute instance-attribute
interrupt_data = None
interrupt_reason class-attribute instance-attribute
interrupt_reason = None
interrupted_node class-attribute instance-attribute
interrupted_node = None
status class-attribute instance-attribute
status = RUNNING
step class-attribute instance-attribute
step = 0
stop_current_execution class-attribute instance-attribute
stop_current_execution = NONE
thread_id class-attribute instance-attribute
thread_id = None

Functions

advance_step
advance_step()

Advance to the next execution step.

Source code in agentflow/state/execution_state.py
134
135
136
137
138
139
140
def advance_step(self) -> None:
    """
    Advance to the next execution step.
    """
    old_step = self.step
    self.step += 1
    logger.debug("Advanced step from %d to %d", old_step, self.step)
clear_interrupt
clear_interrupt()

Clear the interrupt state and resume execution.

Source code in agentflow/state/execution_state.py
110
111
112
113
114
115
116
117
118
def clear_interrupt(self) -> None:
    """
    Clear the interrupt state and resume execution.
    """
    logger.debug("Clearing interrupt, resuming execution")
    self.interrupted_node = None
    self.interrupt_reason = None
    self.interrupt_data = None
    self.status = ExecutionStatus.RUNNING
complete
complete()

Mark execution as completed.

Source code in agentflow/state/execution_state.py
153
154
155
156
157
158
def complete(self) -> None:
    """
    Mark execution as completed.
    """
    logger.info("Marking execution as completed")
    self.status = ExecutionStatus.COMPLETED
error
error(error_msg)

Mark execution as errored.

Parameters:

Name Type Description Default
error_msg
str

Error message to record.

required
Source code in agentflow/state/execution_state.py
160
161
162
163
164
165
166
167
168
169
def error(self, error_msg: str) -> None:
    """
    Mark execution as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Marking execution as errored: %s", error_msg)
    self.status = ExecutionStatus.ERROR
    self.internal_data["error"] = error_msg
from_dict classmethod
from_dict(data)

Create an ExecutionState instance from a dictionary.

Parameters:

Name Type Description Default
data
dict[str, Any]

Dictionary containing execution state fields.

required

Returns:

Name Type Description
ExecutionState ExecutionState

The deserialized execution state object.

Source code in agentflow/state/execution_state.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
    """
    Create an ExecutionState instance from a dictionary.

    Args:
        data (dict[str, Any]): Dictionary containing execution state fields.

    Returns:
        ExecutionState: The deserialized execution state object.
    """
    return cls.model_validate(
        {
            "current_node": data["current_node"],
            "step": data.get("step", 0),
            "status": ExecutionStatus(data.get("status", "running")),
            "interrupted_node": data.get("interrupted_node"),
            "interrupt_reason": data.get("interrupt_reason"),
            "interrupt_data": data.get("interrupt_data"),
            "thread_id": data.get("thread_id"),
            "internal_data": data.get("_internal_data", {}),
        }
    )
is_interrupted
is_interrupted()

Check if execution is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in agentflow/state/execution_state.py
120
121
122
123
124
125
126
127
128
129
130
131
132
def is_interrupted(self) -> bool:
    """
    Check if execution is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.status in [
        ExecutionStatus.INTERRUPTED_BEFORE,
        ExecutionStatus.INTERRUPTED_AFTER,
    ]
    logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
    return interrupted
is_running
is_running()

Check if execution is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in agentflow/state/execution_state.py
171
172
173
174
175
176
177
178
179
180
def is_running(self) -> bool:
    """
    Check if execution is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.status == ExecutionStatus.RUNNING
    logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for execution.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in agentflow/state/execution_state.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for execution.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
    logger.debug(
        "Execution is_stopped_requested: %s (stop_current_execution: %s)",
        stopped,
        self.stop_current_execution.value,
    )
    return stopped
set_current_node
set_current_node(node)

Update the current node in execution state.

Parameters:

Name Type Description Default
node
str

Node to set as current.

required
Source code in agentflow/state/execution_state.py
142
143
144
145
146
147
148
149
150
151
def set_current_node(self, node: str) -> None:
    """
    Update the current node in execution state.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.current_node
    self.current_node = node
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set the interrupt state for execution.

Parameters:

Name Type Description Default
node
str

Node where the interrupt occurred.

required
reason
str

Reason for the interrupt.

required
status
ExecutionStatus

Status to set for the interrupt.

required
data
dict[str, Any] | None

Optional additional interrupt data.

None
Source code in agentflow/state/execution_state.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def set_interrupt(
    self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
) -> None:
    """
    Set the interrupt state for execution.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status (ExecutionStatus): Status to set for the interrupt.
        data (dict[str, Any] | None): Optional additional interrupt data.
    """
    logger.debug(
        "Setting interrupt: node='%s', reason='%s', status='%s'",
        node,
        reason,
        status.value,
    )
    self.interrupted_node = node
    self.interrupt_reason = reason
    self.status = status
    self.interrupt_data = data

ExecutionStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
COMPLETED
ERROR
INTERRUPTED_AFTER
INTERRUPTED_BEFORE
RUNNING
Source code in agentflow/state/execution_state.py
18
19
20
21
22
23
24
25
class ExecutionStatus(Enum):
    """Status of graph execution."""

    RUNNING = "running"
    INTERRUPTED_BEFORE = "interrupted_before"
    INTERRUPTED_AFTER = "interrupted_after"
    COMPLETED = "completed"
    ERROR = "error"

Attributes

COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED_AFTER class-attribute instance-attribute
INTERRUPTED_AFTER = 'interrupted_after'
INTERRUPTED_BEFORE class-attribute instance-attribute
INTERRUPTED_BEFORE = 'interrupted_before'
RUNNING class-attribute instance-attribute
RUNNING = 'running'

ImageBlock

Bases: BaseModel

Image content block for messages.

Attributes:

Name Type Description
type Literal['image']

Block type discriminator.

media MediaRef

Reference to image media.

alt_text str | None

Alternative text for accessibility.

bbox list[float] | None

Bounding box coordinates [x1, y1, x2, y2].

Source code in agentflow/state/message_block.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ImageBlock(BaseModel):
    """
    Image content block for messages.

    Attributes:
        type (Literal["image"]): Block type discriminator.
        media (MediaRef): Reference to image media.
        alt_text (str | None): Alternative text for accessibility.
        bbox (list[float] | None): Bounding box coordinates [x1, y1, x2, y2].
    """

    type: Literal["image"] = "image"
    media: MediaRef
    alt_text: str | None = None
    bbox: list[float] | None = None  # [x1,y1,x2,y2] if applicable

Attributes

alt_text class-attribute instance-attribute
alt_text = None
bbox class-attribute instance-attribute
bbox = None
media instance-attribute
media
type class-attribute instance-attribute
type = 'image'

MediaRef

Bases: BaseModel

Reference to media content (image/audio/video/document/data).

Prefer referencing by URL or provider file_id over inlining base64 for large payloads.

Attributes:

Name Type Description
kind Literal['url', 'file_id', 'data']

Type of reference.

url str | None

URL to media content.

file_id str | None

Provider-managed file ID.

data_base64 str | None

Base64-encoded data (small payloads only).

mime_type str | None

MIME type of the media.

size_bytes int | None

Size in bytes.

sha256 str | None

SHA256 hash of the media.

filename str | None

Filename of the media.

width int | None

Image width (if applicable).

height int | None

Image height (if applicable).

duration_ms int | None

Duration in milliseconds (if applicable).

page int | None

Page number (if applicable).

Source code in agentflow/state/message_block.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MediaRef(BaseModel):
    """
    Reference to media content (image/audio/video/document/data).

    Prefer referencing by URL or provider file_id over inlining base64 for large payloads.

    Attributes:
        kind (Literal["url", "file_id", "data"]): Type of reference.
        url (str | None): URL to media content.
        file_id (str | None): Provider-managed file ID.
        data_base64 (str | None): Base64-encoded data (small payloads only).
        mime_type (str | None): MIME type of the media.
        size_bytes (int | None): Size in bytes.
        sha256 (str | None): SHA256 hash of the media.
        filename (str | None): Filename of the media.
        width (int | None): Image width (if applicable).
        height (int | None): Image height (if applicable).
        duration_ms (int | None): Duration in milliseconds (if applicable).
        page (int | None): Page number (if applicable).
    """

    kind: Literal["url", "file_id", "data"] = "url"
    url: str | None = None  # http(s) or data: URL
    file_id: str | None = None  # provider-managed ID (e.g., OpenAI/Gemini)
    data_base64: str | None = None  # small payloads only
    mime_type: str | None = None
    size_bytes: int | None = None
    sha256: str | None = None
    filename: str | None = None
    # Media-specific hints
    width: int | None = None
    height: int | None = None
    duration_ms: int | None = None
    page: int | None = None

Attributes

data_base64 class-attribute instance-attribute
data_base64 = None
duration_ms class-attribute instance-attribute
duration_ms = None
file_id class-attribute instance-attribute
file_id = None
filename class-attribute instance-attribute
filename = None
height class-attribute instance-attribute
height = None
kind class-attribute instance-attribute
kind = 'url'
mime_type class-attribute instance-attribute
mime_type = None
page class-attribute instance-attribute
page = None
sha256 class-attribute instance-attribute
sha256 = None
size_bytes class-attribute instance-attribute
size_bytes = None
url class-attribute instance-attribute
url = None
width class-attribute instance-attribute
width = None

Message

Bases: BaseModel

Represents a message in a conversation, including content, role, metadata, and token usage.

Attributes:

Name Type Description
message_id str | int

Unique identifier for the message.

role Literal['user', 'assistant', 'system', 'tool']

The role of the message sender.

content list[ContentBlock]

The message content blocks.

delta bool

Indicates if this is a delta/partial message.

tools_calls list[dict[str, Any]] | None

Tool call information, if any.

reasoning str | None

Reasoning or explanation, if any.

timestamp datetime | None

Timestamp of the message.

metadata dict[str, Any]

Additional metadata.

usages TokenUsages | None

Token usage statistics.

raw dict[str, Any] | None

Raw data, if any.

Example

msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])

Methods:

Name Description
attach_media

Append a media block to the content.

text

Best-effort text extraction from content blocks.

text_message

Create a Message instance from plain text.

tool_message

Create a tool message, optionally marking it as an error.

Source code in agentflow/state/message.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class Message(BaseModel):
    """
    Represents a message in a conversation, including content, role, metadata, and token usage.

    Attributes:
        message_id (str | int): Unique identifier for the message.
        role (Literal["user", "assistant", "system", "tool"]): The role of the message sender.
        content (list[ContentBlock]): The message content blocks.
        delta (bool): Indicates if this is a delta/partial message.
        tools_calls (list[dict[str, Any]] | None): Tool call information, if any.
        reasoning (str | None): Reasoning or explanation, if any.
        timestamp (datetime | None): Timestamp of the message.
        metadata (dict[str, Any]): Additional metadata.
        usages (TokenUsages | None): Token usage statistics.
        raw (dict[str, Any] | None): Raw data, if any.

    Example:
        >>> msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])
        {'message_id': 'abc123', 'role': 'user', 'content': [...], ...}
    """

    message_id: str | int = Field(default_factory=lambda: generate_id(None))
    role: Literal["user", "assistant", "system", "tool"]
    content: list[ContentBlock]
    delta: bool = False  # Indicates if this is a delta/partial message
    tools_calls: list[dict[str, Any]] | None = None
    reasoning: str | None = None  # Remove it
    timestamp: float | None = Field(default_factory=lambda: datetime.now().timestamp())
    metadata: dict[str, Any] = Field(default_factory=dict)
    usages: TokenUsages | None = None
    raw: dict[str, Any] | None = None

    @classmethod
    def text_message(
        cls,
        content: str,
        role: Literal["user", "assistant", "system", "tool"] = "user",
        message_id: str | None = None,
    ) -> "Message":
        """
        Create a Message instance from plain text.

        Args:
            content (str): The message content.
            role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
            message_id (str | None): Optional message ID.

        Returns:
            Message: The created Message instance.

        Example:
            >>> Message.text_message("Hello!", role="user")
        """
        logger.debug("Creating message from text with role: %s", role)
        return cls(
            message_id=generate_id(message_id),
            role=role,
            content=[TextBlock(text=content)],
            metadata={},
        )

    @classmethod
    def tool_message(
        cls,
        content: list[ContentBlock],
        message_id: str | None = None,
        meta: dict[str, Any] | None = None,
    ) -> "Message":
        """
        Create a tool message, optionally marking it as an error.

        Args:
            content (list[ContentBlock]): The message content blocks.
            message_id (str | None): Optional message ID.
            meta (dict[str, Any] | None): Optional metadata.

        Returns:
            Message: The created tool message instance.

        Example:
            >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
        """
        res = content
        msg_id = generate_id(message_id)
        return cls(
            message_id=msg_id,
            role="tool",
            content=res,
            metadata=meta or {},
        )

    # --- Convenience helpers ---
    def text(self) -> str:
        """
        Best-effort text extraction from content blocks.

        Returns:
            str: Concatenated text from TextBlock and ToolResultBlock outputs.

        Example:
            >>> msg.text()
            'Hello!Result text.'
        """
        parts: list[str] = []
        for block in self.content:
            if isinstance(block, TextBlock):
                parts.append(block.text)
            elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
                parts.append(block.output)
        return "".join(parts)

    def attach_media(
        self,
        media: MediaRef,
        as_type: Literal["image", "audio", "video", "document"],
    ) -> None:
        """
        Append a media block to the content.

        If content was text, creates a block list. Supports image, audio, video, and document types.

        Args:
            media (MediaRef): Reference to media content.
            as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

        Returns:
            None

        Raises:
            ValueError: If an unsupported media type is provided.

        Example:
            >>> msg.attach_media(media_ref, as_type="image")
        """
        block: ContentBlock
        if as_type == "image":
            block = ImageBlock(media=media)
        elif as_type == "audio":
            block = AudioBlock(media=media)
        elif as_type == "video":
            block = VideoBlock(media=media)
        elif as_type == "document":
            block = DocumentBlock(media=media)
        else:
            raise ValueError(f"Unsupported media type: {as_type}")

        if isinstance(self.content, str):
            self.content = [TextBlock(text=self.content), block]
        elif isinstance(self.content, list):
            self.content.append(block)
        else:
            self.content = [block]

Attributes

content instance-attribute
content
delta class-attribute instance-attribute
delta = False
message_id class-attribute instance-attribute
message_id = Field(default_factory=lambda: generate_id(None))
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
raw class-attribute instance-attribute
raw = None
reasoning class-attribute instance-attribute
reasoning = None
role instance-attribute
role
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=lambda: timestamp())
tools_calls class-attribute instance-attribute
tools_calls = None
usages class-attribute instance-attribute
usages = None

Functions

attach_media
attach_media(media, as_type)

Append a media block to the content.

If content was text, creates a block list. Supports image, audio, video, and document types.

Parameters:

Name Type Description Default
media
MediaRef

Reference to media content.

required
as_type
Literal['image', 'audio', 'video', 'document']

Type of media block to append.

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If an unsupported media type is provided.

Example

msg.attach_media(media_ref, as_type="image")

Source code in agentflow/state/message.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def attach_media(
    self,
    media: MediaRef,
    as_type: Literal["image", "audio", "video", "document"],
) -> None:
    """
    Append a media block to the content.

    If content was text, creates a block list. Supports image, audio, video, and document types.

    Args:
        media (MediaRef): Reference to media content.
        as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

    Returns:
        None

    Raises:
        ValueError: If an unsupported media type is provided.

    Example:
        >>> msg.attach_media(media_ref, as_type="image")
    """
    block: ContentBlock
    if as_type == "image":
        block = ImageBlock(media=media)
    elif as_type == "audio":
        block = AudioBlock(media=media)
    elif as_type == "video":
        block = VideoBlock(media=media)
    elif as_type == "document":
        block = DocumentBlock(media=media)
    else:
        raise ValueError(f"Unsupported media type: {as_type}")

    if isinstance(self.content, str):
        self.content = [TextBlock(text=self.content), block]
    elif isinstance(self.content, list):
        self.content.append(block)
    else:
        self.content = [block]
text
text()

Best-effort text extraction from content blocks.

Returns:

Name Type Description
str str

Concatenated text from TextBlock and ToolResultBlock outputs.

Example

msg.text() 'Hello!Result text.'

Source code in agentflow/state/message.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def text(self) -> str:
    """
    Best-effort text extraction from content blocks.

    Returns:
        str: Concatenated text from TextBlock and ToolResultBlock outputs.

    Example:
        >>> msg.text()
        'Hello!Result text.'
    """
    parts: list[str] = []
    for block in self.content:
        if isinstance(block, TextBlock):
            parts.append(block.text)
        elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
            parts.append(block.output)
    return "".join(parts)
text_message classmethod
text_message(content, role='user', message_id=None)

Create a Message instance from plain text.

Parameters:

Name Type Description Default
content
str

The message content.

required
role
Literal['user', 'assistant', 'system', 'tool']

The role of the sender.

'user'
message_id
str | None

Optional message ID.

None

Returns:

Name Type Description
Message Message

The created Message instance.

Example

Message.text_message("Hello!", role="user")

Source code in agentflow/state/message.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@classmethod
def text_message(
    cls,
    content: str,
    role: Literal["user", "assistant", "system", "tool"] = "user",
    message_id: str | None = None,
) -> "Message":
    """
    Create a Message instance from plain text.

    Args:
        content (str): The message content.
        role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
        message_id (str | None): Optional message ID.

    Returns:
        Message: The created Message instance.

    Example:
        >>> Message.text_message("Hello!", role="user")
    """
    logger.debug("Creating message from text with role: %s", role)
    return cls(
        message_id=generate_id(message_id),
        role=role,
        content=[TextBlock(text=content)],
        metadata={},
    )
tool_message classmethod
tool_message(content, message_id=None, meta=None)

Create a tool message, optionally marking it as an error.

Parameters:

Name Type Description Default
content
list[ContentBlock]

The message content blocks.

required
message_id
str | None

Optional message ID.

None
meta
dict[str, Any] | None

Optional metadata.

None

Returns:

Name Type Description
Message Message

The created tool message instance.

Example

Message.tool_message([ToolResultBlock(...)], message_id="tool1")

Source code in agentflow/state/message.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@classmethod
def tool_message(
    cls,
    content: list[ContentBlock],
    message_id: str | None = None,
    meta: dict[str, Any] | None = None,
) -> "Message":
    """
    Create a tool message, optionally marking it as an error.

    Args:
        content (list[ContentBlock]): The message content blocks.
        message_id (str | None): Optional message ID.
        meta (dict[str, Any] | None): Optional metadata.

    Returns:
        Message: The created tool message instance.

    Example:
        >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
    """
    res = content
    msg_id = generate_id(message_id)
    return cls(
        message_id=msg_id,
        role="tool",
        content=res,
        metadata=meta or {},
    )

MessageContextManager

Bases: BaseContextManager[S]

Manages the context field for AI interactions.

This class trims the context (message history) based on a maximum number of user messages, ensuring the first message (usually a system prompt) is always preserved. Optionally removes tool-related messages (AI messages with tool calls and tool result messages). Generic over AgentState or its subclasses.

Methods:

Name Description
__init__

Initialize the MessageContextManager.

atrim_context

Asynchronous version of trim_context.

trim_context

Trim the context in the given AgentState based on the maximum number of user messages.

Attributes:

Name Type Description
max_messages
remove_tool_msgs
Source code in agentflow/state/message_context_manager.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class MessageContextManager(BaseContextManager[S]):
    """
    Manages the context field for AI interactions.

    This class trims the context (message history) based on a maximum number of user messages,
    ensuring the first message (usually a system prompt) is always preserved.
    Optionally removes tool-related messages (AI messages with tool calls and tool result messages).
    Generic over AgentState or its subclasses.
    """

    def __init__(self, max_messages: int = 10, remove_tool_msgs: bool = False) -> None:
        """
        Initialize the MessageContextManager.

        Args:
            max_messages (int): Maximum number of
                user messages to keep in context. Default is 10.
            remove_tool_msgs (bool): Whether to remove tool messages from context.
                Default is False.
        """
        self.max_messages = max_messages
        self.remove_tool_msgs = remove_tool_msgs
        logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)

    def _trim(self, messages: list[Message]) -> list[Message] | None:
        """
        Trim messages keeping system messages and most recent user messages.

        Returns None if no trimming is needed, otherwise returns the trimmed list.
        """
        # check context is empty
        if not messages:
            logger.debug("No messages to trim; context is empty")
            return None

        # First, remove tool messages if requested
        if self.remove_tool_msgs:
            messages = remove_tool_messages(messages)
            logger.debug("Removed tool messages, %d messages remaining", len(messages))

        # Count user messages
        user_message_count = sum(1 for msg in messages if msg.role == "user")

        if user_message_count <= self.max_messages:
            # Check if we removed tool messages but no trimming needed
            if self.remove_tool_msgs:
                # Return the filtered messages even if count is within limits
                return messages
            # no trimming needed
            logger.debug(
                "No trimming needed; context is within limits (%d user messages)",
                user_message_count,
            )
            return None

        # Separate system messages (usually at the beginning)
        system_messages = [msg for msg in messages if msg.role == "system"]
        non_system_messages = [msg for msg in messages if msg.role != "system"]

        # Find the index of the oldest user message to keep
        user_count = 0
        start_index = len(non_system_messages)

        # Iterate from the end to find the position to start keeping messages
        for i in range(len(non_system_messages) - 1, -1, -1):
            msg = non_system_messages[i]
            if msg.role == "user":
                user_count += 1
                if user_count == self.max_messages:
                    start_index = i
                    break

        # Keep messages from start_index onwards
        final_non_system = non_system_messages[start_index:]

        # Combine system messages (at start) with trimmed conversation
        trimmed_messages = system_messages + final_non_system

        logger.debug(
            "Trimmed from %d to %d messages (%d user messages kept)",
            len(messages),
            len(trimmed_messages),
            self.max_messages,
        )

        return trimmed_messages

    def trim_context(self, state: S) -> S:
        """
        Trim the context in the given AgentState based on the maximum number of user messages.

        The first message (typically a system prompt) is always preserved. Only the most recent
        user messages up to `max_messages` are kept, along with the first message.

        If `remove_tool_msgs` is True, also removes:
        - AI messages that contain tool calls (intermediate tool-calling messages)
        - Tool result messages (role="tool")

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state

    async def atrim_context(self, state: S) -> S:
        """
        Asynchronous version of trim_context.

        If `remove_tool_msgs` is True, also removes:
        - AI messages that contain tool calls (intermediate tool-calling messages)
        - Tool result messages (role="tool")

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state

Attributes

max_messages instance-attribute
max_messages = max_messages
remove_tool_msgs instance-attribute
remove_tool_msgs = remove_tool_msgs

Functions

__init__
__init__(max_messages=10, remove_tool_msgs=False)

Initialize the MessageContextManager.

Parameters:

Name Type Description Default
max_messages
int

Maximum number of user messages to keep in context. Default is 10.

10
remove_tool_msgs
bool

Whether to remove tool messages from context. Default is False.

False
Source code in agentflow/state/message_context_manager.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, max_messages: int = 10, remove_tool_msgs: bool = False) -> None:
    """
    Initialize the MessageContextManager.

    Args:
        max_messages (int): Maximum number of
            user messages to keep in context. Default is 10.
        remove_tool_msgs (bool): Whether to remove tool messages from context.
            Default is False.
    """
    self.max_messages = max_messages
    self.remove_tool_msgs = remove_tool_msgs
    logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)
atrim_context async
atrim_context(state)

Asynchronous version of trim_context.

If remove_tool_msgs is True, also removes: - AI messages that contain tool calls (intermediate tool-calling messages) - Tool result messages (role="tool")

Parameters:

Name Type Description Default
state
AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in agentflow/state/message_context_manager.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def atrim_context(self, state: S) -> S:
    """
    Asynchronous version of trim_context.

    If `remove_tool_msgs` is True, also removes:
    - AI messages that contain tool calls (intermediate tool-calling messages)
    - Tool result messages (role="tool")

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state
trim_context
trim_context(state)

Trim the context in the given AgentState based on the maximum number of user messages.

The first message (typically a system prompt) is always preserved. Only the most recent user messages up to max_messages are kept, along with the first message.

If remove_tool_msgs is True, also removes: - AI messages that contain tool calls (intermediate tool-calling messages) - Tool result messages (role="tool")

Parameters:

Name Type Description Default
state
AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in agentflow/state/message_context_manager.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def trim_context(self, state: S) -> S:
    """
    Trim the context in the given AgentState based on the maximum number of user messages.

    The first message (typically a system prompt) is always preserved. Only the most recent
    user messages up to `max_messages` are kept, along with the first message.

    If `remove_tool_msgs` is True, also removes:
    - AI messages that contain tool calls (intermediate tool-calling messages)
    - Tool result messages (role="tool")

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state

ReasoningBlock

Bases: BaseModel

Reasoning content block for messages.

Attributes:

Name Type Description
type Literal['reasoning']

Block type discriminator.

summary str

Summary of reasoning.

details list[str] | None

Detailed reasoning steps.

Source code in agentflow/state/message_block.py
218
219
220
221
222
223
224
225
226
227
228
229
230
class ReasoningBlock(BaseModel):
    """
    Reasoning content block for messages.

    Attributes:
        type (Literal["reasoning"]): Block type discriminator.
        summary (str): Summary of reasoning.
        details (list[str] | None): Detailed reasoning steps.
    """

    type: Literal["reasoning"] = "reasoning"
    summary: str
    details: list[str] | None = None

Attributes

details class-attribute instance-attribute
details = None
summary instance-attribute
summary
type class-attribute instance-attribute
type = 'reasoning'

StreamChunk

Bases: BaseModel

Unified wrapper for different types of streaming data.

This class provides a single interface for handling various streaming chunk types (messages, events, state updates, errors) with type-safe discrimination.

Attributes:

Name Type Description
type

The type of streaming chunk.

data dict | None

The actual chunk data (Message, EventModel, dict, etc.).

metadata dict | None

Optional additional metadata for the chunk.

Classes:

Name Description
Config

Pydantic configuration for EventModel.

Source code in agentflow/state/stream_chunks.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class StreamChunk(BaseModel):
    """
    Unified wrapper for different types of streaming data.

    This class provides a single interface for handling various streaming chunk types
    (messages, events, state updates, errors) with type-safe discrimination.

    Attributes:
        type: The type of streaming chunk.
        data: The actual chunk data (Message, EventModel, dict, etc.).
        metadata: Optional additional metadata for the chunk.
    """

    event: StreamEvent = StreamEvent.MESSAGE
    # data holders for different chunk types
    message: Message | None = None
    state: AgentState | None = None
    # Placeholder for other chunk types
    data: dict | None = None

    # Optional identifiers
    thread_id: str | None = None
    run_id: str | None = None
    # Optional metadata
    metadata: dict | None = None
    timestamp: float = Field(
        default_factory=datetime.now().timestamp,
        description="UNIX timestamp of when chunk was created",
    )

    class Config:
        """Pydantic configuration for EventModel.

        Attributes:
            use_enum_values: Output enums as strings.
        """

        use_enum_values = True

Attributes

data class-attribute instance-attribute
data = None
event class-attribute instance-attribute
event = MESSAGE
message class-attribute instance-attribute
message = None
metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
state class-attribute instance-attribute
state = None
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=timestamp, description='UNIX timestamp of when chunk was created')

Classes

Config

Pydantic configuration for EventModel.

Attributes:

Name Type Description
use_enum_values

Output enums as strings.

Source code in agentflow/state/stream_chunks.py
59
60
61
62
63
64
65
66
class Config:
    """Pydantic configuration for EventModel.

    Attributes:
        use_enum_values: Output enums as strings.
    """

    use_enum_values = True
Attributes
use_enum_values class-attribute instance-attribute
use_enum_values = True

StreamEvent

Bases: str, Enum

Attributes:

Name Type Description
ERROR
MESSAGE
STATE
UPDATES
Source code in agentflow/state/stream_chunks.py
22
23
24
25
26
class StreamEvent(str, enum.Enum):
    STATE = "state"
    MESSAGE = "message"
    ERROR = "error"
    UPDATES = "updates"

Attributes

ERROR class-attribute instance-attribute
ERROR = 'error'
MESSAGE class-attribute instance-attribute
MESSAGE = 'message'
STATE class-attribute instance-attribute
STATE = 'state'
UPDATES class-attribute instance-attribute
UPDATES = 'updates'

TextBlock

Bases: BaseModel

Text content block for messages.

Attributes:

Name Type Description
type Literal['text']

Block type discriminator.

text str

Text content.

annotations list[AnnotationRef]

List of annotation references.

Source code in agentflow/state/message_block.py
61
62
63
64
65
66
67
68
69
70
71
72
73
class TextBlock(BaseModel):
    """
    Text content block for messages.

    Attributes:
        type (Literal["text"]): Block type discriminator.
        text (str): Text content.
        annotations (list[AnnotationRef]): List of annotation references.
    """

    type: Literal["text"] = "text"
    text: str
    annotations: list[AnnotationRef] = Field(default_factory=list)

Attributes

annotations class-attribute instance-attribute
annotations = Field(default_factory=list)
text instance-attribute
text
type class-attribute instance-attribute
type = 'text'

TokenUsages

Bases: BaseModel

Tracks token usage statistics for a message or model response.

Attributes:

Name Type Description
completion_tokens int

Number of completion tokens used.

prompt_tokens int

Number of prompt tokens used.

total_tokens int

Total tokens used.

reasoning_tokens int

Reasoning tokens used (optional).

cache_creation_input_tokens int

Cache creation input tokens (optional).

cache_read_input_tokens int

Cache read input tokens (optional).

image_tokens int | None

Image tokens for multimodal models (optional).

audio_tokens int | None

Audio tokens for multimodal models (optional).

Example

usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)

Source code in agentflow/state/message.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class TokenUsages(BaseModel):
    """
    Tracks token usage statistics for a message or model response.

    Attributes:
        completion_tokens (int): Number of completion tokens used.
        prompt_tokens (int): Number of prompt tokens used.
        total_tokens (int): Total tokens used.
        reasoning_tokens (int): Reasoning tokens used (optional).
        cache_creation_input_tokens (int): Cache creation input tokens (optional).
        cache_read_input_tokens (int): Cache read input tokens (optional).
        image_tokens (int | None): Image tokens for multimodal models (optional).
        audio_tokens (int | None): Audio tokens for multimodal models (optional).

    Example:
        >>> usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)
        {'completion_tokens': 10, 'prompt_tokens': 20, 'total_tokens': 30, ...}
    """

    completion_tokens: int
    prompt_tokens: int
    total_tokens: int
    reasoning_tokens: int = 0
    cache_creation_input_tokens: int = 0
    cache_read_input_tokens: int = 0
    # Optional modality-specific usage fields for multimodal models
    image_tokens: int | None = 0
    audio_tokens: int | None = 0

Attributes

audio_tokens class-attribute instance-attribute
audio_tokens = 0
cache_creation_input_tokens class-attribute instance-attribute
cache_creation_input_tokens = 0
cache_read_input_tokens class-attribute instance-attribute
cache_read_input_tokens = 0
completion_tokens instance-attribute
completion_tokens
image_tokens class-attribute instance-attribute
image_tokens = 0
prompt_tokens instance-attribute
prompt_tokens
reasoning_tokens class-attribute instance-attribute
reasoning_tokens = 0
total_tokens instance-attribute
total_tokens

ToolCallBlock

Bases: BaseModel

Tool call content block for messages.

Attributes:

Name Type Description
type Literal['tool_call']

Block type discriminator.

id str

Tool call ID.

name str

Tool name.

args dict[str, Any]

Arguments for the tool call.

tool_type str | None

Type of tool (e.g., web_search, file_search).

Source code in agentflow/state/message_block.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class ToolCallBlock(BaseModel):
    """
    Tool call content block for messages.

    Attributes:
        type (Literal["tool_call"]): Block type discriminator.
        id (str): Tool call ID.
        name (str): Tool name.
        args (dict[str, Any]): Arguments for the tool call.
        tool_type (str | None): Type of tool (e.g., web_search, file_search).
    """

    type: Literal["tool_call"] = "tool_call"
    id: str
    name: str
    args: dict[str, Any] = Field(default_factory=dict)
    tool_type: str | None = None  # e.g., web_search, file_search, computer_use

Attributes

args class-attribute instance-attribute
args = Field(default_factory=dict)
id instance-attribute
id
name instance-attribute
name
tool_type class-attribute instance-attribute
tool_type = None
type class-attribute instance-attribute
type = 'tool_call'

ToolResultBlock

Bases: BaseModel

Tool result content block for messages.

Attributes:

Name Type Description
type Literal['tool_result']

Block type discriminator.

call_id str

Tool call ID.

output Any

Output from the tool (str, dict, MediaRef, or list of blocks).

is_error bool

Whether the result is an error.

status Literal['completed', 'failed'] | None

Status of the tool call.

Source code in agentflow/state/message_block.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class ToolResultBlock(BaseModel):
    """
    Tool result content block for messages.

    Attributes:
        type (Literal["tool_result"]): Block type discriminator.
        call_id (str): Tool call ID.
        output (Any): Output from the tool (str, dict, MediaRef, or list of blocks).
        is_error (bool): Whether the result is an error.
        status (Literal["completed", "failed"] | None): Status of the tool call.
    """

    type: Literal["tool_result"] = "tool_result"
    call_id: str
    output: Any = None  # str | dict | MediaRef | list[ContentBlock-like]
    is_error: bool = False
    status: Literal["completed", "failed"] | None = None

Attributes

call_id instance-attribute
call_id
is_error class-attribute instance-attribute
is_error = False
output class-attribute instance-attribute
output = None
status class-attribute instance-attribute
status = None
type class-attribute instance-attribute
type = 'tool_result'

VideoBlock

Bases: BaseModel

Video content block for messages.

Attributes:

Name Type Description
type Literal['video']

Block type discriminator.

media MediaRef

Reference to video media.

thumbnail MediaRef | None

Reference to thumbnail image.

Source code in agentflow/state/message_block.py
112
113
114
115
116
117
118
119
120
121
122
123
124
class VideoBlock(BaseModel):
    """
    Video content block for messages.

    Attributes:
        type (Literal["video"]): Block type discriminator.
        media (MediaRef): Reference to video media.
        thumbnail (MediaRef | None): Reference to thumbnail image.
    """

    type: Literal["video"] = "video"
    media: MediaRef
    thumbnail: MediaRef | None = None

Attributes

media instance-attribute
media
thumbnail class-attribute instance-attribute
thumbnail = None
type class-attribute instance-attribute
type = 'video'

Functions

add_messages

add_messages(left, right)

Adds messages to the list, avoiding duplicates by message_id.

Parameters:

Name Type Description Default

left

list[Message]

Existing list of messages.

required

right

list[Message]

New messages to add.

required

Returns:

Type Description
list[Message]

list[Message]: Combined list with unique messages.

Example

add_messages([msg1], [msg2, msg1]) [msg1, msg2]

Source code in agentflow/state/reducers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def add_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Adds messages to the list, avoiding duplicates by message_id.

    Args:
        left (list[Message]): Existing list of messages.
        right (list[Message]): New messages to add.

    Returns:
        list[Message]: Combined list with unique messages.

    Example:
        >>> add_messages([msg1], [msg2, msg1])
        [msg1, msg2]
    """
    left_ids = {msg.message_id for msg in left}
    right = [msg for msg in right if msg.message_id not in left_ids and msg.delta is False]
    return left + right

append_items

append_items(left, right)

Appends items to a list, avoiding duplicates by item.id.

Parameters:

Name Type Description Default

left

list

Existing list of items (must have .id attribute).

required

right

list

New items to add.

required

Returns:

Name Type Description
list list

Combined list with unique items.

Example

append_items([item1], [item2, item1]) [item1, item2]

Source code in agentflow/state/reducers.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def append_items(left: list, right: list) -> list:
    """
    Appends items to a list, avoiding duplicates by item.id.

    Args:
        left (list): Existing list of items (must have .id attribute).
        right (list): New items to add.

    Returns:
        list: Combined list with unique items.

    Example:
        >>> append_items([item1], [item2, item1])
        [item1, item2]
    """
    left_ids = {item.id for item in left}
    right = [item for item in right if item.id not in left_ids]
    return left + right

remove_tool_messages

remove_tool_messages(messages)

Remove COMPLETED tool interaction sequences from the message list.

A tool sequence is only removed if it's COMPLETE: 1. AI message with tool_calls (triggering tools) 2. One or more tool result messages (role="tool") 3. AI message WITHOUT tool_calls (final response using tool results)

If a sequence is incomplete (e.g., tool call made but no final AI response yet), ALL messages are kept to maintain conversation continuity.

Edge cases handled: - Incomplete sequences (AI called tool, waiting for results): Keep everything - Partial sequences (AI called tool, got results, but no final response): Keep everything - Multiple tool calls in one AI message: Handles correctly - Consecutive tool sequences: Each evaluated independently

Parameters:

Name Type Description Default

messages

list[Message]

List of messages to filter.

required

Returns:

Type Description
list[Message]

list[Message]: Filtered list with only COMPLETED tool sequences removed.

Example

Complete sequence (will be cleaned):

messages = [user_msg, ai_with_tools, tool_result, ai_final] remove_tool_messages(messages) [user_msg, ai_final]

Incomplete sequence (will be kept):

messages = [user_msg, ai_with_tools] remove_tool_messages(messages) [user_msg, ai_with_tools] # Keep everything - sequence incomplete!

Partial sequence (will be kept):

messages = [user_msg, ai_with_tools, tool_result] remove_tool_messages(messages) [user_msg, ai_with_tools, tool_result] # Keep - no final AI response!

Source code in agentflow/state/reducers.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def remove_tool_messages(messages: list[Message]) -> list[Message]:
    """
    Remove COMPLETED tool interaction sequences from the message list.

    A tool sequence is only removed if it's COMPLETE:
    1. AI message with tool_calls (triggering tools)
    2. One or more tool result messages (role="tool")
    3. AI message WITHOUT tool_calls (final response using tool results)

    If a sequence is incomplete (e.g., tool call made but no final AI response yet),
    ALL messages are kept to maintain conversation continuity.

    Edge cases handled:
    - Incomplete sequences (AI called tool, waiting for results): Keep everything
    - Partial sequences (AI called tool, got results, but no final response): Keep everything
    - Multiple tool calls in one AI message: Handles correctly
    - Consecutive tool sequences: Each evaluated independently

    Args:
        messages (list[Message]): List of messages to filter.

    Returns:
        list[Message]: Filtered list with only COMPLETED tool sequences removed.

    Example:
        Complete sequence (will be cleaned):
        >>> messages = [user_msg, ai_with_tools, tool_result, ai_final]
        >>> remove_tool_messages(messages)
        [user_msg, ai_final]

        Incomplete sequence (will be kept):
        >>> messages = [user_msg, ai_with_tools]
        >>> remove_tool_messages(messages)
        [user_msg, ai_with_tools]  # Keep everything - sequence incomplete!

        Partial sequence (will be kept):
        >>> messages = [user_msg, ai_with_tools, tool_result]
        >>> remove_tool_messages(messages)
        [user_msg, ai_with_tools, tool_result]  # Keep - no final AI response!
    """
    if not messages:
        return messages

    # Step 1: Identify indices to remove by scanning for COMPLETE sequences
    indices_to_remove = set()
    i = 0

    while i < len(messages):
        msg = messages[i]

        # Look for AI message with tool calls (potential sequence start)
        if msg.role == "assistant" and msg.tools_calls:
            sequence_start = i
            i += 1

            # Collect all following tool result messages
            tool_result_indices = []
            while i < len(messages) and messages[i].role == "tool":
                tool_result_indices.append(i)
                i += 1

            # Check if there's a final AI response (without tool_calls)
            has_final_response = (
                i < len(messages)
                and messages[i].role == "assistant"
                and not messages[i].tools_calls
            )
            if has_final_response:
                # COMPLETE SEQUENCE FOUND!
                # Mark AI with tool_calls and all tool results for removal
                indices_to_remove.add(sequence_start)
                indices_to_remove.update(tool_result_indices)
                # Note: We keep the final AI response (index i)
                i += 1  # Move past the final AI response
            else:
                # INCOMPLETE SEQUENCE - keep everything
                # Don't add anything to indices_to_remove
                # i is already positioned correctly (at next message or end)
                pass
        else:
            i += 1

    # Step 2: Build filtered list excluding marked indices
    return [msg for idx, msg in enumerate(messages) if idx not in indices_to_remove]

replace_messages

replace_messages(left, right)

Replaces the entire message list with a new one.

Parameters:

Name Type Description Default

left

list[Message]

Existing list of messages (ignored).

required

right

list[Message]

New list of messages.

required

Returns:

Type Description
list[Message]

list[Message]: The new message list.

Example

replace_messages([msg1], [msg2]) [msg2]

Source code in agentflow/state/reducers.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def replace_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Replaces the entire message list with a new one.

    Args:
        left (list[Message]): Existing list of messages (ignored).
        right (list[Message]): New list of messages.

    Returns:
        list[Message]: The new message list.

    Example:
        >>> replace_messages([msg1], [msg2])
        [msg2]
    """
    return right

replace_value

replace_value(left, right)

Replaces a value with another.

Parameters:

Name Type Description Default

left

Existing value (ignored).

required

right

New value to use.

required

Returns:

Name Type Description
Any

The new value.

Example

replace_value(1, 2) 2

Source code in agentflow/state/reducers.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def replace_value(left, right):
    """
    Replaces a value with another.

    Args:
        left: Existing value (ignored).
        right: New value to use.

    Returns:
        Any: The new value.

    Example:
        >>> replace_value(1, 2)
        2
    """
    return right

Modules

agent_state

Agent state schema for TAF agent graphs.

This module provides the AgentState class, which tracks message context, context summaries, and internal execution metadata for agent workflows. Supports subclassing for custom application fields.

Classes:

Name Description
AgentState

Common state schema that includes messages, context and internal execution metadata.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

AgentState

Bases: BaseModel

Common state schema that includes messages, context and internal execution metadata.

This class can be subclassed to add application-specific fields while maintaining compatibility with the TAF framework. All internal execution metadata is preserved through subclassing.

Notes: - execution_meta contains internal-only execution progress and interrupt info. - Users may subclass AgentState to add application fields; internal exec meta remains available to the runtime and will be persisted with the state. - When subclassing, add your fields but keep the core fields intact.

Example

class MyCustomState(AgentState): user_data: dict = Field(default_factory=dict) custom_field: str = "default"

Methods:

Name Description
advance_step

Advance the execution step in the metadata.

clear_interrupt

Clear any interrupt in the execution metadata.

complete

Mark the agent state as completed.

error

Mark the agent state as errored.

is_interrupted

Check if the agent state is currently interrupted.

is_running

Check if the agent state is currently running.

is_stopped_requested

Check if a stop has been requested for the agent state.

set_current_node

Set the current node in the execution metadata.

set_interrupt

Set an interrupt in the execution metadata.

Attributes:

Name Type Description
context Annotated[list[Message], add_messages]
context_summary str | None
execution_meta ExecutionState
Source code in agentflow/state/agent_state.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class AgentState(BaseModel):
    """Common state schema that includes messages, context and internal execution metadata.

    This class can be subclassed to add application-specific fields while maintaining
    compatibility with the TAF framework. All internal execution metadata
    is preserved through subclassing.

    Notes:
    - `execution_meta` contains internal-only execution progress and interrupt info.
    - Users may subclass `AgentState` to add application fields; internal exec meta remains
      available to the runtime and will be persisted with the state.
    - When subclassing, add your fields but keep the core fields intact.

    Example:
        class MyCustomState(AgentState):
            user_data: dict = Field(default_factory=dict)
            custom_field: str = "default"
    """

    context: Annotated[list[Message], add_messages] = Field(default_factory=list)
    context_summary: str | None = None
    # Internal execution metadata (kept private-ish but accessible to runtime)
    execution_meta: ExecMeta = Field(default_factory=lambda: ExecMeta(current_node=START))

    # Convenience delegation methods for execution meta so callers can use the same API
    def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
        """
        Set an interrupt in the execution metadata.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status: Execution status to set.
            data (dict | None): Optional additional interrupt data.
        """
        logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
        self.execution_meta.set_interrupt(node, reason, status, data)

    def clear_interrupt(self) -> None:
        """
        Clear any interrupt in the execution metadata.
        """
        logger.debug("Clearing interrupt")
        self.execution_meta.clear_interrupt()

    def is_running(self) -> bool:
        """
        Check if the agent state is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.execution_meta.is_running()
        logger.debug("State is_running: %s", running)
        return running

    def is_interrupted(self) -> bool:
        """
        Check if the agent state is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.execution_meta.is_interrupted()
        logger.debug("State is_interrupted: %s", interrupted)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance the execution step in the metadata.
        """
        old_step = self.execution_meta.step
        self.execution_meta.advance_step()
        logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)

    def set_current_node(self, node: str) -> None:
        """
        Set the current node in the execution metadata.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.execution_meta.current_node
        self.execution_meta.set_current_node(node)
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark the agent state as completed.
        """
        logger.info("Marking state as completed")
        self.execution_meta.complete()

    def error(self, error_msg: str) -> None:
        """
        Mark the agent state as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Setting state error: %s", error_msg)
        self.execution_meta.error(error_msg)

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for the agent state.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.execution_meta.is_stopped_requested()
        logger.debug("State is_stopped_requested: %s", stopped)
        return stopped
Attributes
context class-attribute instance-attribute
context = Field(default_factory=list)
context_summary class-attribute instance-attribute
context_summary = None
execution_meta class-attribute instance-attribute
execution_meta = Field(default_factory=lambda: ExecutionState(current_node=START))
Functions
advance_step
advance_step()

Advance the execution step in the metadata.

Source code in agentflow/state/agent_state.py
93
94
95
96
97
98
99
def advance_step(self) -> None:
    """
    Advance the execution step in the metadata.
    """
    old_step = self.execution_meta.step
    self.execution_meta.advance_step()
    logger.debug("Advanced step from %d to %d", old_step, self.execution_meta.step)
clear_interrupt
clear_interrupt()

Clear any interrupt in the execution metadata.

Source code in agentflow/state/agent_state.py
64
65
66
67
68
69
def clear_interrupt(self) -> None:
    """
    Clear any interrupt in the execution metadata.
    """
    logger.debug("Clearing interrupt")
    self.execution_meta.clear_interrupt()
complete
complete()

Mark the agent state as completed.

Source code in agentflow/state/agent_state.py
112
113
114
115
116
117
def complete(self) -> None:
    """
    Mark the agent state as completed.
    """
    logger.info("Marking state as completed")
    self.execution_meta.complete()
error
error(error_msg)

Mark the agent state as errored.

Parameters:

Name Type Description Default
error_msg str

Error message to record.

required
Source code in agentflow/state/agent_state.py
119
120
121
122
123
124
125
126
127
def error(self, error_msg: str) -> None:
    """
    Mark the agent state as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Setting state error: %s", error_msg)
    self.execution_meta.error(error_msg)
is_interrupted
is_interrupted()

Check if the agent state is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in agentflow/state/agent_state.py
82
83
84
85
86
87
88
89
90
91
def is_interrupted(self) -> bool:
    """
    Check if the agent state is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.execution_meta.is_interrupted()
    logger.debug("State is_interrupted: %s", interrupted)
    return interrupted
is_running
is_running()

Check if the agent state is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in agentflow/state/agent_state.py
71
72
73
74
75
76
77
78
79
80
def is_running(self) -> bool:
    """
    Check if the agent state is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.execution_meta.is_running()
    logger.debug("State is_running: %s", running)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for the agent state.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in agentflow/state/agent_state.py
129
130
131
132
133
134
135
136
137
138
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for the agent state.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.execution_meta.is_stopped_requested()
    logger.debug("State is_stopped_requested: %s", stopped)
    return stopped
set_current_node
set_current_node(node)

Set the current node in the execution metadata.

Parameters:

Name Type Description Default
node str

Node to set as current.

required
Source code in agentflow/state/agent_state.py
101
102
103
104
105
106
107
108
109
110
def set_current_node(self, node: str) -> None:
    """
    Set the current node in the execution metadata.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.execution_meta.current_node
    self.execution_meta.set_current_node(node)
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set an interrupt in the execution metadata.

Parameters:

Name Type Description Default
node str

Node where the interrupt occurred.

required
reason str

Reason for the interrupt.

required
status

Execution status to set.

required
data dict | None

Optional additional interrupt data.

None
Source code in agentflow/state/agent_state.py
51
52
53
54
55
56
57
58
59
60
61
62
def set_interrupt(self, node: str, reason: str, status, data: dict | None = None) -> None:
    """
    Set an interrupt in the execution metadata.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status: Execution status to set.
        data (dict | None): Optional additional interrupt data.
    """
    logger.debug("Setting interrupt at node '%s' with reason: %s", node, reason)
    self.execution_meta.set_interrupt(node, reason, status, data)

Functions

base_context

Abstract base class for context management in TAF agent graphs.

This module provides BaseContextManager, which defines the interface for trimming and managing message context in agent state objects.

Classes:

Name Description
BaseContextManager

Abstract base class for context management in AI interactions.

Attributes:

Name Type Description
S
logger

Attributes

S module-attribute
S = TypeVar('S', bound=AgentState)
logger module-attribute
logger = getLogger(__name__)

Classes

BaseContextManager

Bases: ABC

Abstract base class for context management in AI interactions.

Subclasses should implement trim_context as either a synchronous or asynchronous method. Generic over AgentState or its subclasses.

Methods:

Name Description
atrim_context

Trim context based on message count asynchronously.

trim_context

Trim context based on message count. Can be sync or async.

Source code in agentflow/state/base_context.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class BaseContextManager[S](ABC):
    """
    Abstract base class for context management in AI interactions.

    Subclasses should implement `trim_context` as either a synchronous or asynchronous method.
    Generic over AgentState or its subclasses.
    """

    @abstractmethod
    def trim_context(self, state: S) -> S:
        """
        Trim context based on message count. Can be sync or async.

        Subclasses may implement as either a synchronous or asynchronous method.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context, either directly or as an awaitable.
        """
        raise NotImplementedError("Subclasses must implement this method (sync or async)")

    @abstractmethod
    async def atrim_context(self, state: S) -> S:
        """
        Trim context based on message count asynchronously.

        Args:
            state: The state containing context to be trimmed.

        Returns:
            The state with trimmed context.
        """
        raise NotImplementedError("Subclasses must implement this method")
Functions
atrim_context abstractmethod async
atrim_context(state)

Trim context based on message count asynchronously.

Parameters:

Name Type Description Default
state S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context.

Source code in agentflow/state/base_context.py
43
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
async def atrim_context(self, state: S) -> S:
    """
    Trim context based on message count asynchronously.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context.
    """
    raise NotImplementedError("Subclasses must implement this method")
trim_context abstractmethod
trim_context(state)

Trim context based on message count. Can be sync or async.

Subclasses may implement as either a synchronous or asynchronous method.

Parameters:

Name Type Description Default
state S

The state containing context to be trimmed.

required

Returns:

Type Description
S

The state with trimmed context, either directly or as an awaitable.

Source code in agentflow/state/base_context.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@abstractmethod
def trim_context(self, state: S) -> S:
    """
    Trim context based on message count. Can be sync or async.

    Subclasses may implement as either a synchronous or asynchronous method.

    Args:
        state: The state containing context to be trimmed.

    Returns:
        The state with trimmed context, either directly or as an awaitable.
    """
    raise NotImplementedError("Subclasses must implement this method (sync or async)")

execution_state

Execution state management for graph execution in TAF.

This module provides the ExecutionState class and related enums to track progress, interruptions, and pause/resume functionality for agent graph execution.

Classes:

Name Description
ExecutionState

Tracks the internal execution state of a graph.

ExecutionStatus

Status of graph execution.

StopRequestStatus

Status of graph execution.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

ExecutionState

Bases: BaseModel

Tracks the internal execution state of a graph.

This class manages the execution progress, interrupt status, and internal data that should not be exposed to users.

Methods:

Name Description
advance_step

Advance to the next execution step.

clear_interrupt

Clear the interrupt state and resume execution.

complete

Mark execution as completed.

error

Mark execution as errored.

from_dict

Create an ExecutionState instance from a dictionary.

is_interrupted

Check if execution is currently interrupted.

is_running

Check if execution is currently running.

is_stopped_requested

Check if a stop has been requested for execution.

set_current_node

Update the current node in execution state.

set_interrupt

Set the interrupt state for execution.

Attributes:

Name Type Description
current_node str
internal_data dict[str, Any]
interrupt_data dict[str, Any] | None
interrupt_reason str | None
interrupted_node str | None
status ExecutionStatus
step int
stop_current_execution StopRequestStatus
thread_id str | None
Source code in agentflow/state/execution_state.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class ExecutionState(BaseModel):
    """
    Tracks the internal execution state of a graph.

    This class manages the execution progress, interrupt status, and internal
    data that should not be exposed to users.
    """

    # Core execution tracking
    current_node: str
    step: int = 0
    status: ExecutionStatus = ExecutionStatus.RUNNING

    # Interrupt management
    interrupted_node: str | None = None
    interrupt_reason: str | None = None
    interrupt_data: dict[str, Any] | None = None

    # Thread/session identification
    thread_id: str | None = None

    # Stop Current Execution Flag
    stop_current_execution: StopRequestStatus = StopRequestStatus.NONE

    # Internal execution data (hidden from user)
    internal_data: dict[str, Any] = Field(default_factory=dict)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
        """
        Create an ExecutionState instance from a dictionary.

        Args:
            data (dict[str, Any]): Dictionary containing execution state fields.

        Returns:
            ExecutionState: The deserialized execution state object.
        """
        return cls.model_validate(
            {
                "current_node": data["current_node"],
                "step": data.get("step", 0),
                "status": ExecutionStatus(data.get("status", "running")),
                "interrupted_node": data.get("interrupted_node"),
                "interrupt_reason": data.get("interrupt_reason"),
                "interrupt_data": data.get("interrupt_data"),
                "thread_id": data.get("thread_id"),
                "internal_data": data.get("_internal_data", {}),
            }
        )

    def set_interrupt(
        self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
    ) -> None:
        """
        Set the interrupt state for execution.

        Args:
            node (str): Node where the interrupt occurred.
            reason (str): Reason for the interrupt.
            status (ExecutionStatus): Status to set for the interrupt.
            data (dict[str, Any] | None): Optional additional interrupt data.
        """
        logger.debug(
            "Setting interrupt: node='%s', reason='%s', status='%s'",
            node,
            reason,
            status.value,
        )
        self.interrupted_node = node
        self.interrupt_reason = reason
        self.status = status
        self.interrupt_data = data

    def clear_interrupt(self) -> None:
        """
        Clear the interrupt state and resume execution.
        """
        logger.debug("Clearing interrupt, resuming execution")
        self.interrupted_node = None
        self.interrupt_reason = None
        self.interrupt_data = None
        self.status = ExecutionStatus.RUNNING

    def is_interrupted(self) -> bool:
        """
        Check if execution is currently interrupted.

        Returns:
            bool: True if interrupted, False otherwise.
        """
        interrupted = self.status in [
            ExecutionStatus.INTERRUPTED_BEFORE,
            ExecutionStatus.INTERRUPTED_AFTER,
        ]
        logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
        return interrupted

    def advance_step(self) -> None:
        """
        Advance to the next execution step.
        """
        old_step = self.step
        self.step += 1
        logger.debug("Advanced step from %d to %d", old_step, self.step)

    def set_current_node(self, node: str) -> None:
        """
        Update the current node in execution state.

        Args:
            node (str): Node to set as current.
        """
        old_node = self.current_node
        self.current_node = node
        logger.debug("Changed current node from '%s' to '%s'", old_node, node)

    def complete(self) -> None:
        """
        Mark execution as completed.
        """
        logger.info("Marking execution as completed")
        self.status = ExecutionStatus.COMPLETED

    def error(self, error_msg: str) -> None:
        """
        Mark execution as errored.

        Args:
            error_msg (str): Error message to record.
        """
        logger.error("Marking execution as errored: %s", error_msg)
        self.status = ExecutionStatus.ERROR
        self.internal_data["error"] = error_msg

    def is_running(self) -> bool:
        """
        Check if execution is currently running.

        Returns:
            bool: True if running, False otherwise.
        """
        running = self.status == ExecutionStatus.RUNNING
        logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
        return running

    def is_stopped_requested(self) -> bool:
        """
        Check if a stop has been requested for execution.

        Returns:
            bool: True if stop requested, False otherwise.
        """
        stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
        logger.debug(
            "Execution is_stopped_requested: %s (stop_current_execution: %s)",
            stopped,
            self.stop_current_execution.value,
        )
        return stopped
Attributes
current_node instance-attribute
current_node
internal_data class-attribute instance-attribute
internal_data = Field(default_factory=dict)
interrupt_data class-attribute instance-attribute
interrupt_data = None
interrupt_reason class-attribute instance-attribute
interrupt_reason = None
interrupted_node class-attribute instance-attribute
interrupted_node = None
status class-attribute instance-attribute
status = RUNNING
step class-attribute instance-attribute
step = 0
stop_current_execution class-attribute instance-attribute
stop_current_execution = NONE
thread_id class-attribute instance-attribute
thread_id = None
Functions
advance_step
advance_step()

Advance to the next execution step.

Source code in agentflow/state/execution_state.py
134
135
136
137
138
139
140
def advance_step(self) -> None:
    """
    Advance to the next execution step.
    """
    old_step = self.step
    self.step += 1
    logger.debug("Advanced step from %d to %d", old_step, self.step)
clear_interrupt
clear_interrupt()

Clear the interrupt state and resume execution.

Source code in agentflow/state/execution_state.py
110
111
112
113
114
115
116
117
118
def clear_interrupt(self) -> None:
    """
    Clear the interrupt state and resume execution.
    """
    logger.debug("Clearing interrupt, resuming execution")
    self.interrupted_node = None
    self.interrupt_reason = None
    self.interrupt_data = None
    self.status = ExecutionStatus.RUNNING
complete
complete()

Mark execution as completed.

Source code in agentflow/state/execution_state.py
153
154
155
156
157
158
def complete(self) -> None:
    """
    Mark execution as completed.
    """
    logger.info("Marking execution as completed")
    self.status = ExecutionStatus.COMPLETED
error
error(error_msg)

Mark execution as errored.

Parameters:

Name Type Description Default
error_msg str

Error message to record.

required
Source code in agentflow/state/execution_state.py
160
161
162
163
164
165
166
167
168
169
def error(self, error_msg: str) -> None:
    """
    Mark execution as errored.

    Args:
        error_msg (str): Error message to record.
    """
    logger.error("Marking execution as errored: %s", error_msg)
    self.status = ExecutionStatus.ERROR
    self.internal_data["error"] = error_msg
from_dict classmethod
from_dict(data)

Create an ExecutionState instance from a dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing execution state fields.

required

Returns:

Name Type Description
ExecutionState ExecutionState

The deserialized execution state object.

Source code in agentflow/state/execution_state.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ExecutionState":
    """
    Create an ExecutionState instance from a dictionary.

    Args:
        data (dict[str, Any]): Dictionary containing execution state fields.

    Returns:
        ExecutionState: The deserialized execution state object.
    """
    return cls.model_validate(
        {
            "current_node": data["current_node"],
            "step": data.get("step", 0),
            "status": ExecutionStatus(data.get("status", "running")),
            "interrupted_node": data.get("interrupted_node"),
            "interrupt_reason": data.get("interrupt_reason"),
            "interrupt_data": data.get("interrupt_data"),
            "thread_id": data.get("thread_id"),
            "internal_data": data.get("_internal_data", {}),
        }
    )
is_interrupted
is_interrupted()

Check if execution is currently interrupted.

Returns:

Name Type Description
bool bool

True if interrupted, False otherwise.

Source code in agentflow/state/execution_state.py
120
121
122
123
124
125
126
127
128
129
130
131
132
def is_interrupted(self) -> bool:
    """
    Check if execution is currently interrupted.

    Returns:
        bool: True if interrupted, False otherwise.
    """
    interrupted = self.status in [
        ExecutionStatus.INTERRUPTED_BEFORE,
        ExecutionStatus.INTERRUPTED_AFTER,
    ]
    logger.debug("Execution is_interrupted: %s (status: %s)", interrupted, self.status.value)
    return interrupted
is_running
is_running()

Check if execution is currently running.

Returns:

Name Type Description
bool bool

True if running, False otherwise.

Source code in agentflow/state/execution_state.py
171
172
173
174
175
176
177
178
179
180
def is_running(self) -> bool:
    """
    Check if execution is currently running.

    Returns:
        bool: True if running, False otherwise.
    """
    running = self.status == ExecutionStatus.RUNNING
    logger.debug("Execution is_running: %s (status: %s)", running, self.status.value)
    return running
is_stopped_requested
is_stopped_requested()

Check if a stop has been requested for execution.

Returns:

Name Type Description
bool bool

True if stop requested, False otherwise.

Source code in agentflow/state/execution_state.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def is_stopped_requested(self) -> bool:
    """
    Check if a stop has been requested for execution.

    Returns:
        bool: True if stop requested, False otherwise.
    """
    stopped = self.stop_current_execution == StopRequestStatus.STOP_REQUESTED
    logger.debug(
        "Execution is_stopped_requested: %s (stop_current_execution: %s)",
        stopped,
        self.stop_current_execution.value,
    )
    return stopped
set_current_node
set_current_node(node)

Update the current node in execution state.

Parameters:

Name Type Description Default
node str

Node to set as current.

required
Source code in agentflow/state/execution_state.py
142
143
144
145
146
147
148
149
150
151
def set_current_node(self, node: str) -> None:
    """
    Update the current node in execution state.

    Args:
        node (str): Node to set as current.
    """
    old_node = self.current_node
    self.current_node = node
    logger.debug("Changed current node from '%s' to '%s'", old_node, node)
set_interrupt
set_interrupt(node, reason, status, data=None)

Set the interrupt state for execution.

Parameters:

Name Type Description Default
node str

Node where the interrupt occurred.

required
reason str

Reason for the interrupt.

required
status ExecutionStatus

Status to set for the interrupt.

required
data dict[str, Any] | None

Optional additional interrupt data.

None
Source code in agentflow/state/execution_state.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def set_interrupt(
    self, node: str, reason: str, status: ExecutionStatus, data: dict[str, Any] | None = None
) -> None:
    """
    Set the interrupt state for execution.

    Args:
        node (str): Node where the interrupt occurred.
        reason (str): Reason for the interrupt.
        status (ExecutionStatus): Status to set for the interrupt.
        data (dict[str, Any] | None): Optional additional interrupt data.
    """
    logger.debug(
        "Setting interrupt: node='%s', reason='%s', status='%s'",
        node,
        reason,
        status.value,
    )
    self.interrupted_node = node
    self.interrupt_reason = reason
    self.status = status
    self.interrupt_data = data
ExecutionStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
COMPLETED
ERROR
INTERRUPTED_AFTER
INTERRUPTED_BEFORE
RUNNING
Source code in agentflow/state/execution_state.py
18
19
20
21
22
23
24
25
class ExecutionStatus(Enum):
    """Status of graph execution."""

    RUNNING = "running"
    INTERRUPTED_BEFORE = "interrupted_before"
    INTERRUPTED_AFTER = "interrupted_after"
    COMPLETED = "completed"
    ERROR = "error"
Attributes
COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED_AFTER class-attribute instance-attribute
INTERRUPTED_AFTER = 'interrupted_after'
INTERRUPTED_BEFORE class-attribute instance-attribute
INTERRUPTED_BEFORE = 'interrupted_before'
RUNNING class-attribute instance-attribute
RUNNING = 'running'
StopRequestStatus

Bases: Enum

Status of graph execution.

Attributes:

Name Type Description
NONE
STOPPED
STOP_REQUESTED
Source code in agentflow/state/execution_state.py
28
29
30
31
32
33
class StopRequestStatus(Enum):
    """Status of graph execution."""

    NONE = "none"
    STOP_REQUESTED = "stop_requested"
    STOPPED = "stopped"
Attributes
NONE class-attribute instance-attribute
NONE = 'none'
STOPPED class-attribute instance-attribute
STOPPED = 'stopped'
STOP_REQUESTED class-attribute instance-attribute
STOP_REQUESTED = 'stop_requested'

message

Message and content block primitives for agent graphs.

This module defines the core message representation, multimodal content blocks, token usage tracking, and utility functions for agent graph communication.

Classes:

Name Description
TokenUsages

Tracks token usage statistics for a message or model response.

MediaRef

Reference to media content (image/audio/video/document/data).

AnnotationRef

Reference to annotation metadata.

Message

Represents a message in a conversation, including content, role, metadata, and token usage.

Functions:

Name Description
generate_id

Generates a message or tool call ID based on DI context and type.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

Message

Bases: BaseModel

Represents a message in a conversation, including content, role, metadata, and token usage.

Attributes:

Name Type Description
message_id str | int

Unique identifier for the message.

role Literal['user', 'assistant', 'system', 'tool']

The role of the message sender.

content list[ContentBlock]

The message content blocks.

delta bool

Indicates if this is a delta/partial message.

tools_calls list[dict[str, Any]] | None

Tool call information, if any.

reasoning str | None

Reasoning or explanation, if any.

timestamp datetime | None

Timestamp of the message.

metadata dict[str, Any]

Additional metadata.

usages TokenUsages | None

Token usage statistics.

raw dict[str, Any] | None

Raw data, if any.

Example

msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])

Methods:

Name Description
attach_media

Append a media block to the content.

text

Best-effort text extraction from content blocks.

text_message

Create a Message instance from plain text.

tool_message

Create a tool message, optionally marking it as an error.

Source code in agentflow/state/message.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class Message(BaseModel):
    """
    Represents a message in a conversation, including content, role, metadata, and token usage.

    Attributes:
        message_id (str | int): Unique identifier for the message.
        role (Literal["user", "assistant", "system", "tool"]): The role of the message sender.
        content (list[ContentBlock]): The message content blocks.
        delta (bool): Indicates if this is a delta/partial message.
        tools_calls (list[dict[str, Any]] | None): Tool call information, if any.
        reasoning (str | None): Reasoning or explanation, if any.
        timestamp (datetime | None): Timestamp of the message.
        metadata (dict[str, Any]): Additional metadata.
        usages (TokenUsages | None): Token usage statistics.
        raw (dict[str, Any] | None): Raw data, if any.

    Example:
        >>> msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])
        {'message_id': 'abc123', 'role': 'user', 'content': [...], ...}
    """

    message_id: str | int = Field(default_factory=lambda: generate_id(None))
    role: Literal["user", "assistant", "system", "tool"]
    content: list[ContentBlock]
    delta: bool = False  # Indicates if this is a delta/partial message
    tools_calls: list[dict[str, Any]] | None = None
    reasoning: str | None = None  # Remove it
    timestamp: float | None = Field(default_factory=lambda: datetime.now().timestamp())
    metadata: dict[str, Any] = Field(default_factory=dict)
    usages: TokenUsages | None = None
    raw: dict[str, Any] | None = None

    @classmethod
    def text_message(
        cls,
        content: str,
        role: Literal["user", "assistant", "system", "tool"] = "user",
        message_id: str | None = None,
    ) -> "Message":
        """
        Create a Message instance from plain text.

        Args:
            content (str): The message content.
            role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
            message_id (str | None): Optional message ID.

        Returns:
            Message: The created Message instance.

        Example:
            >>> Message.text_message("Hello!", role="user")
        """
        logger.debug("Creating message from text with role: %s", role)
        return cls(
            message_id=generate_id(message_id),
            role=role,
            content=[TextBlock(text=content)],
            metadata={},
        )

    @classmethod
    def tool_message(
        cls,
        content: list[ContentBlock],
        message_id: str | None = None,
        meta: dict[str, Any] | None = None,
    ) -> "Message":
        """
        Create a tool message, optionally marking it as an error.

        Args:
            content (list[ContentBlock]): The message content blocks.
            message_id (str | None): Optional message ID.
            meta (dict[str, Any] | None): Optional metadata.

        Returns:
            Message: The created tool message instance.

        Example:
            >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
        """
        res = content
        msg_id = generate_id(message_id)
        return cls(
            message_id=msg_id,
            role="tool",
            content=res,
            metadata=meta or {},
        )

    # --- Convenience helpers ---
    def text(self) -> str:
        """
        Best-effort text extraction from content blocks.

        Returns:
            str: Concatenated text from TextBlock and ToolResultBlock outputs.

        Example:
            >>> msg.text()
            'Hello!Result text.'
        """
        parts: list[str] = []
        for block in self.content:
            if isinstance(block, TextBlock):
                parts.append(block.text)
            elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
                parts.append(block.output)
        return "".join(parts)

    def attach_media(
        self,
        media: MediaRef,
        as_type: Literal["image", "audio", "video", "document"],
    ) -> None:
        """
        Append a media block to the content.

        If content was text, creates a block list. Supports image, audio, video, and document types.

        Args:
            media (MediaRef): Reference to media content.
            as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

        Returns:
            None

        Raises:
            ValueError: If an unsupported media type is provided.

        Example:
            >>> msg.attach_media(media_ref, as_type="image")
        """
        block: ContentBlock
        if as_type == "image":
            block = ImageBlock(media=media)
        elif as_type == "audio":
            block = AudioBlock(media=media)
        elif as_type == "video":
            block = VideoBlock(media=media)
        elif as_type == "document":
            block = DocumentBlock(media=media)
        else:
            raise ValueError(f"Unsupported media type: {as_type}")

        if isinstance(self.content, str):
            self.content = [TextBlock(text=self.content), block]
        elif isinstance(self.content, list):
            self.content.append(block)
        else:
            self.content = [block]
Attributes
content instance-attribute
content
delta class-attribute instance-attribute
delta = False
message_id class-attribute instance-attribute
message_id = Field(default_factory=lambda: generate_id(None))
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
raw class-attribute instance-attribute
raw = None
reasoning class-attribute instance-attribute
reasoning = None
role instance-attribute
role
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=lambda: timestamp())
tools_calls class-attribute instance-attribute
tools_calls = None
usages class-attribute instance-attribute
usages = None
Functions
attach_media
attach_media(media, as_type)

Append a media block to the content.

If content was text, creates a block list. Supports image, audio, video, and document types.

Parameters:

Name Type Description Default
media MediaRef

Reference to media content.

required
as_type Literal['image', 'audio', 'video', 'document']

Type of media block to append.

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If an unsupported media type is provided.

Example

msg.attach_media(media_ref, as_type="image")

Source code in agentflow/state/message.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def attach_media(
    self,
    media: MediaRef,
    as_type: Literal["image", "audio", "video", "document"],
) -> None:
    """
    Append a media block to the content.

    If content was text, creates a block list. Supports image, audio, video, and document types.

    Args:
        media (MediaRef): Reference to media content.
        as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

    Returns:
        None

    Raises:
        ValueError: If an unsupported media type is provided.

    Example:
        >>> msg.attach_media(media_ref, as_type="image")
    """
    block: ContentBlock
    if as_type == "image":
        block = ImageBlock(media=media)
    elif as_type == "audio":
        block = AudioBlock(media=media)
    elif as_type == "video":
        block = VideoBlock(media=media)
    elif as_type == "document":
        block = DocumentBlock(media=media)
    else:
        raise ValueError(f"Unsupported media type: {as_type}")

    if isinstance(self.content, str):
        self.content = [TextBlock(text=self.content), block]
    elif isinstance(self.content, list):
        self.content.append(block)
    else:
        self.content = [block]
text
text()

Best-effort text extraction from content blocks.

Returns:

Name Type Description
str str

Concatenated text from TextBlock and ToolResultBlock outputs.

Example

msg.text() 'Hello!Result text.'

Source code in agentflow/state/message.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def text(self) -> str:
    """
    Best-effort text extraction from content blocks.

    Returns:
        str: Concatenated text from TextBlock and ToolResultBlock outputs.

    Example:
        >>> msg.text()
        'Hello!Result text.'
    """
    parts: list[str] = []
    for block in self.content:
        if isinstance(block, TextBlock):
            parts.append(block.text)
        elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
            parts.append(block.output)
    return "".join(parts)
text_message classmethod
text_message(content, role='user', message_id=None)

Create a Message instance from plain text.

Parameters:

Name Type Description Default
content str

The message content.

required
role Literal['user', 'assistant', 'system', 'tool']

The role of the sender.

'user'
message_id str | None

Optional message ID.

None

Returns:

Name Type Description
Message Message

The created Message instance.

Example

Message.text_message("Hello!", role="user")

Source code in agentflow/state/message.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
@classmethod
def text_message(
    cls,
    content: str,
    role: Literal["user", "assistant", "system", "tool"] = "user",
    message_id: str | None = None,
) -> "Message":
    """
    Create a Message instance from plain text.

    Args:
        content (str): The message content.
        role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
        message_id (str | None): Optional message ID.

    Returns:
        Message: The created Message instance.

    Example:
        >>> Message.text_message("Hello!", role="user")
    """
    logger.debug("Creating message from text with role: %s", role)
    return cls(
        message_id=generate_id(message_id),
        role=role,
        content=[TextBlock(text=content)],
        metadata={},
    )
tool_message classmethod
tool_message(content, message_id=None, meta=None)

Create a tool message, optionally marking it as an error.

Parameters:

Name Type Description Default
content list[ContentBlock]

The message content blocks.

required
message_id str | None

Optional message ID.

None
meta dict[str, Any] | None

Optional metadata.

None

Returns:

Name Type Description
Message Message

The created tool message instance.

Example

Message.tool_message([ToolResultBlock(...)], message_id="tool1")

Source code in agentflow/state/message.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@classmethod
def tool_message(
    cls,
    content: list[ContentBlock],
    message_id: str | None = None,
    meta: dict[str, Any] | None = None,
) -> "Message":
    """
    Create a tool message, optionally marking it as an error.

    Args:
        content (list[ContentBlock]): The message content blocks.
        message_id (str | None): Optional message ID.
        meta (dict[str, Any] | None): Optional metadata.

    Returns:
        Message: The created tool message instance.

    Example:
        >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
    """
    res = content
    msg_id = generate_id(message_id)
    return cls(
        message_id=msg_id,
        role="tool",
        content=res,
        metadata=meta or {},
    )
TokenUsages

Bases: BaseModel

Tracks token usage statistics for a message or model response.

Attributes:

Name Type Description
completion_tokens int

Number of completion tokens used.

prompt_tokens int

Number of prompt tokens used.

total_tokens int

Total tokens used.

reasoning_tokens int

Reasoning tokens used (optional).

cache_creation_input_tokens int

Cache creation input tokens (optional).

cache_read_input_tokens int

Cache read input tokens (optional).

image_tokens int | None

Image tokens for multimodal models (optional).

audio_tokens int | None

Audio tokens for multimodal models (optional).

Example

usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)

Source code in agentflow/state/message.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class TokenUsages(BaseModel):
    """
    Tracks token usage statistics for a message or model response.

    Attributes:
        completion_tokens (int): Number of completion tokens used.
        prompt_tokens (int): Number of prompt tokens used.
        total_tokens (int): Total tokens used.
        reasoning_tokens (int): Reasoning tokens used (optional).
        cache_creation_input_tokens (int): Cache creation input tokens (optional).
        cache_read_input_tokens (int): Cache read input tokens (optional).
        image_tokens (int | None): Image tokens for multimodal models (optional).
        audio_tokens (int | None): Audio tokens for multimodal models (optional).

    Example:
        >>> usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)
        {'completion_tokens': 10, 'prompt_tokens': 20, 'total_tokens': 30, ...}
    """

    completion_tokens: int
    prompt_tokens: int
    total_tokens: int
    reasoning_tokens: int = 0
    cache_creation_input_tokens: int = 0
    cache_read_input_tokens: int = 0
    # Optional modality-specific usage fields for multimodal models
    image_tokens: int | None = 0
    audio_tokens: int | None = 0
Attributes
audio_tokens class-attribute instance-attribute
audio_tokens = 0
cache_creation_input_tokens class-attribute instance-attribute
cache_creation_input_tokens = 0
cache_read_input_tokens class-attribute instance-attribute
cache_read_input_tokens = 0
completion_tokens instance-attribute
completion_tokens
image_tokens class-attribute instance-attribute
image_tokens = 0
prompt_tokens instance-attribute
prompt_tokens
reasoning_tokens class-attribute instance-attribute
reasoning_tokens = 0
total_tokens instance-attribute
total_tokens

Functions

generate_id
generate_id(default_id)

Generate a message or tool call ID based on DI context and type.

Parameters:

Name Type Description Default
default_id
str | int | None

Default ID to use if provided and matches type.

required

Returns:

Type Description
str | int

str | int: Generated or provided ID, type determined by DI context.

Example

generate_id("abc123") 'abc123' generate_id(None) 'a-uuid-string'

Source code in agentflow/state/message.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def generate_id(default_id: str | int | None) -> str | int:
    """
    Generate a message or tool call ID based on DI context and type.

    Args:
        default_id (str | int | None): Default ID to use if provided and matches type.

    Returns:
        str | int: Generated or provided ID, type determined by DI context.

    Raises:
        None

    Example:
        >>> generate_id("abc123")
        'abc123'
        >>> generate_id(None)
        'a-uuid-string'
    """
    id_type = InjectQ.get_instance().try_get("generated_id_type", "string")
    generated_id = InjectQ.get_instance().try_get("generated_id", None)

    # if user provided an awaitable, resolve it
    if isinstance(generated_id, Awaitable):

        async def wait_for_id():
            return await generated_id

        generated_id = asyncio.run(wait_for_id())

    if generated_id:
        return generated_id

    if default_id:
        if id_type == "string" and isinstance(default_id, str):
            return default_id
        if id_type in ("int", "bigint") and isinstance(default_id, int):
            return default_id

    # if not matched or default_id is None, generate new id
    logger.debug(
        "Generating new id of type: %s. Default ID not provided or not matched %s",
        id_type,
        default_id,
    )

    if id_type == "int":
        return uuid4().int >> 32
    if id_type == "bigint":
        return uuid4().int >> 64
    return str(uuid4())

message_block

Classes:

Name Description
AnnotationBlock

Annotation content block for messages.

AnnotationRef

Reference to annotation metadata (e.g., citation, note).

AudioBlock

Audio content block for messages.

DataBlock

Data content block for messages.

DocumentBlock

Document content block for messages.

ErrorBlock

Error content block for messages.

ImageBlock

Image content block for messages.

MediaRef

Reference to media content (image/audio/video/document/data).

ReasoningBlock

Reasoning content block for messages.

RemoteToolCallBlock

Remote Tool call content block for messages.

TextBlock

Text content block for messages.

ToolCallBlock

Tool call content block for messages.

ToolResultBlock

Tool result content block for messages.

VideoBlock

Video content block for messages.

Attributes:

Name Type Description
ContentBlock

Attributes

ContentBlock module-attribute
ContentBlock = Annotated[Union[TextBlock, ImageBlock, AudioBlock, VideoBlock, DocumentBlock, DataBlock, ToolCallBlock, RemoteToolCallBlock, ToolResultBlock, ReasoningBlock, AnnotationBlock, ErrorBlock], Field(discriminator='type')]

Classes

AnnotationBlock

Bases: BaseModel

Annotation content block for messages.

Attributes:

Name Type Description
type Literal['annotation']

Block type discriminator.

kind Literal['citation', 'note']

Kind of annotation.

refs list[AnnotationRef]

List of annotation references.

spans list[tuple[int, int]] | None

Spans covered by the annotation.

Source code in agentflow/state/message_block.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
class AnnotationBlock(BaseModel):
    """
    Annotation content block for messages.

    Attributes:
        type (Literal["annotation"]): Block type discriminator.
        kind (Literal["citation", "note"]): Kind of annotation.
        refs (list[AnnotationRef]): List of annotation references.
        spans (list[tuple[int, int]] | None): Spans covered by the annotation.
    """

    type: Literal["annotation"] = "annotation"
    kind: Literal["citation", "note"] = "citation"
    refs: list[AnnotationRef] = Field(default_factory=list)
    spans: list[tuple[int, int]] | None = None
Attributes
kind class-attribute instance-attribute
kind = 'citation'
refs class-attribute instance-attribute
refs = Field(default_factory=list)
spans class-attribute instance-attribute
spans = None
type class-attribute instance-attribute
type = 'annotation'
AnnotationRef

Bases: BaseModel

Reference to annotation metadata (e.g., citation, note).

Attributes:

Name Type Description
url str | None

URL to annotation source.

file_id str | None

Provider-managed file ID.

page int | None

Page number (if applicable).

index int | None

Index within the annotation source.

title str | None

Title of the annotation.

Source code in agentflow/state/message_block.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class AnnotationRef(BaseModel):
    """
    Reference to annotation metadata (e.g., citation, note).

    Attributes:
        url (str | None): URL to annotation source.
        file_id (str | None): Provider-managed file ID.
        page (int | None): Page number (if applicable).
        index (int | None): Index within the annotation source.
        title (str | None): Title of the annotation.
    """

    url: str | None = None
    file_id: str | None = None
    page: int | None = None
    index: int | None = None
    title: str | None = None
Attributes
file_id class-attribute instance-attribute
file_id = None
index class-attribute instance-attribute
index = None
page class-attribute instance-attribute
page = None
title class-attribute instance-attribute
title = None
url class-attribute instance-attribute
url = None
AudioBlock

Bases: BaseModel

Audio content block for messages.

Attributes:

Name Type Description
type Literal['audio']

Block type discriminator.

media MediaRef

Reference to audio media.

transcript str | None

Transcript of audio.

sample_rate int | None

Sample rate in Hz.

channels int | None

Number of audio channels.

Source code in agentflow/state/message_block.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class AudioBlock(BaseModel):
    """
    Audio content block for messages.

    Attributes:
        type (Literal["audio"]): Block type discriminator.
        media (MediaRef): Reference to audio media.
        transcript (str | None): Transcript of audio.
        sample_rate (int | None): Sample rate in Hz.
        channels (int | None): Number of audio channels.
    """

    type: Literal["audio"] = "audio"
    media: MediaRef
    transcript: str | None = None
    sample_rate: int | None = None
    channels: int | None = None
Attributes
channels class-attribute instance-attribute
channels = None
media instance-attribute
media
sample_rate class-attribute instance-attribute
sample_rate = None
transcript class-attribute instance-attribute
transcript = None
type class-attribute instance-attribute
type = 'audio'
DataBlock

Bases: BaseModel

Data content block for messages.

Attributes:

Name Type Description
type Literal['data']

Block type discriminator.

mime_type str

MIME type of the data.

data_base64 str | None

Base64-encoded data.

media MediaRef | None

Reference to associated media.

Source code in agentflow/state/message_block.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class DataBlock(BaseModel):
    """
    Data content block for messages.

    Attributes:
        type (Literal["data"]): Block type discriminator.
        mime_type (str): MIME type of the data.
        data_base64 (str | None): Base64-encoded data.
        media (MediaRef | None): Reference to associated media.
    """

    type: Literal["data"] = "data"
    mime_type: str
    data_base64: str | None = None
    media: MediaRef | None = None
Attributes
data_base64 class-attribute instance-attribute
data_base64 = None
media class-attribute instance-attribute
media = None
mime_type instance-attribute
mime_type
type class-attribute instance-attribute
type = 'data'
DocumentBlock

Bases: BaseModel

Document content block for messages.

Attributes:

Name Type Description
type Literal['document']

Block type discriminator.

media MediaRef

Reference to document media.

pages list[int] | None

List of page numbers.

excerpt str | None

Excerpt from the document.

Source code in agentflow/state/message_block.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class DocumentBlock(BaseModel):
    """
    Document content block for messages.

    Attributes:
        type (Literal["document"]): Block type discriminator.
        media (MediaRef): Reference to document media.
        pages (list[int] | None): List of page numbers.
        excerpt (str | None): Excerpt from the document.
    """

    type: Literal["document"] = "document"
    media: MediaRef
    pages: list[int] | None = None
    excerpt: str | None = None
Attributes
excerpt class-attribute instance-attribute
excerpt = None
media instance-attribute
media
pages class-attribute instance-attribute
pages = None
type class-attribute instance-attribute
type = 'document'
ErrorBlock

Bases: BaseModel

Error content block for messages.

Attributes:

Name Type Description
type Literal['error']

Block type discriminator.

message str

Error message.

code str | None

Error code.

data dict[str, Any] | None

Additional error data.

Source code in agentflow/state/message_block.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class ErrorBlock(BaseModel):
    """
    Error content block for messages.

    Attributes:
        type (Literal["error"]): Block type discriminator.
        message (str): Error message.
        code (str | None): Error code.
        data (dict[str, Any] | None): Additional error data.
    """

    type: Literal["error"] = "error"
    message: str
    code: str | None = None
    data: dict[str, Any] | None = None
Attributes
code class-attribute instance-attribute
code = None
data class-attribute instance-attribute
data = None
message instance-attribute
message
type class-attribute instance-attribute
type = 'error'
ImageBlock

Bases: BaseModel

Image content block for messages.

Attributes:

Name Type Description
type Literal['image']

Block type discriminator.

media MediaRef

Reference to image media.

alt_text str | None

Alternative text for accessibility.

bbox list[float] | None

Bounding box coordinates [x1, y1, x2, y2].

Source code in agentflow/state/message_block.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ImageBlock(BaseModel):
    """
    Image content block for messages.

    Attributes:
        type (Literal["image"]): Block type discriminator.
        media (MediaRef): Reference to image media.
        alt_text (str | None): Alternative text for accessibility.
        bbox (list[float] | None): Bounding box coordinates [x1, y1, x2, y2].
    """

    type: Literal["image"] = "image"
    media: MediaRef
    alt_text: str | None = None
    bbox: list[float] | None = None  # [x1,y1,x2,y2] if applicable
Attributes
alt_text class-attribute instance-attribute
alt_text = None
bbox class-attribute instance-attribute
bbox = None
media instance-attribute
media
type class-attribute instance-attribute
type = 'image'
MediaRef

Bases: BaseModel

Reference to media content (image/audio/video/document/data).

Prefer referencing by URL or provider file_id over inlining base64 for large payloads.

Attributes:

Name Type Description
kind Literal['url', 'file_id', 'data']

Type of reference.

url str | None

URL to media content.

file_id str | None

Provider-managed file ID.

data_base64 str | None

Base64-encoded data (small payloads only).

mime_type str | None

MIME type of the media.

size_bytes int | None

Size in bytes.

sha256 str | None

SHA256 hash of the media.

filename str | None

Filename of the media.

width int | None

Image width (if applicable).

height int | None

Image height (if applicable).

duration_ms int | None

Duration in milliseconds (if applicable).

page int | None

Page number (if applicable).

Source code in agentflow/state/message_block.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MediaRef(BaseModel):
    """
    Reference to media content (image/audio/video/document/data).

    Prefer referencing by URL or provider file_id over inlining base64 for large payloads.

    Attributes:
        kind (Literal["url", "file_id", "data"]): Type of reference.
        url (str | None): URL to media content.
        file_id (str | None): Provider-managed file ID.
        data_base64 (str | None): Base64-encoded data (small payloads only).
        mime_type (str | None): MIME type of the media.
        size_bytes (int | None): Size in bytes.
        sha256 (str | None): SHA256 hash of the media.
        filename (str | None): Filename of the media.
        width (int | None): Image width (if applicable).
        height (int | None): Image height (if applicable).
        duration_ms (int | None): Duration in milliseconds (if applicable).
        page (int | None): Page number (if applicable).
    """

    kind: Literal["url", "file_id", "data"] = "url"
    url: str | None = None  # http(s) or data: URL
    file_id: str | None = None  # provider-managed ID (e.g., OpenAI/Gemini)
    data_base64: str | None = None  # small payloads only
    mime_type: str | None = None
    size_bytes: int | None = None
    sha256: str | None = None
    filename: str | None = None
    # Media-specific hints
    width: int | None = None
    height: int | None = None
    duration_ms: int | None = None
    page: int | None = None
Attributes
data_base64 class-attribute instance-attribute
data_base64 = None
duration_ms class-attribute instance-attribute
duration_ms = None
file_id class-attribute instance-attribute
file_id = None
filename class-attribute instance-attribute
filename = None
height class-attribute instance-attribute
height = None
kind class-attribute instance-attribute
kind = 'url'
mime_type class-attribute instance-attribute
mime_type = None
page class-attribute instance-attribute
page = None
sha256 class-attribute instance-attribute
sha256 = None
size_bytes class-attribute instance-attribute
size_bytes = None
url class-attribute instance-attribute
url = None
width class-attribute instance-attribute
width = None
ReasoningBlock

Bases: BaseModel

Reasoning content block for messages.

Attributes:

Name Type Description
type Literal['reasoning']

Block type discriminator.

summary str

Summary of reasoning.

details list[str] | None

Detailed reasoning steps.

Source code in agentflow/state/message_block.py
218
219
220
221
222
223
224
225
226
227
228
229
230
class ReasoningBlock(BaseModel):
    """
    Reasoning content block for messages.

    Attributes:
        type (Literal["reasoning"]): Block type discriminator.
        summary (str): Summary of reasoning.
        details (list[str] | None): Detailed reasoning steps.
    """

    type: Literal["reasoning"] = "reasoning"
    summary: str
    details: list[str] | None = None
Attributes
details class-attribute instance-attribute
details = None
summary instance-attribute
summary
type class-attribute instance-attribute
type = 'reasoning'
RemoteToolCallBlock

Bases: BaseModel

Remote Tool call content block for messages.

Attributes:

Name Type Description
type Literal['remote_tool_call']

Block type discriminator.

id str

Tool call ID.

name str

Tool name.

args dict[str, Any]

Arguments for the tool call.

tool_type str | None

Type of tool (e.g., web_search, file_search).

Source code in agentflow/state/message_block.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class RemoteToolCallBlock(BaseModel):
    """
    Remote Tool call content block for messages.

    Attributes:
        type (Literal["remote_tool_call"]): Block type discriminator.
        id (str): Tool call ID.
        name (str): Tool name.
        args (dict[str, Any]): Arguments for the tool call.
        tool_type (str | None): Type of tool (e.g., web_search, file_search).
    """

    type: Literal["remote_tool_call"] = "remote_tool_call"
    id: str
    name: str
    args: dict[str, Any] = Field(default_factory=dict)
    tool_type: str = "remote"
Attributes
args class-attribute instance-attribute
args = Field(default_factory=dict)
id instance-attribute
id
name instance-attribute
name
tool_type class-attribute instance-attribute
tool_type = 'remote'
type class-attribute instance-attribute
type = 'remote_tool_call'
TextBlock

Bases: BaseModel

Text content block for messages.

Attributes:

Name Type Description
type Literal['text']

Block type discriminator.

text str

Text content.

annotations list[AnnotationRef]

List of annotation references.

Source code in agentflow/state/message_block.py
61
62
63
64
65
66
67
68
69
70
71
72
73
class TextBlock(BaseModel):
    """
    Text content block for messages.

    Attributes:
        type (Literal["text"]): Block type discriminator.
        text (str): Text content.
        annotations (list[AnnotationRef]): List of annotation references.
    """

    type: Literal["text"] = "text"
    text: str
    annotations: list[AnnotationRef] = Field(default_factory=list)
Attributes
annotations class-attribute instance-attribute
annotations = Field(default_factory=list)
text instance-attribute
text
type class-attribute instance-attribute
type = 'text'
ToolCallBlock

Bases: BaseModel

Tool call content block for messages.

Attributes:

Name Type Description
type Literal['tool_call']

Block type discriminator.

id str

Tool call ID.

name str

Tool name.

args dict[str, Any]

Arguments for the tool call.

tool_type str | None

Type of tool (e.g., web_search, file_search).

Source code in agentflow/state/message_block.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class ToolCallBlock(BaseModel):
    """
    Tool call content block for messages.

    Attributes:
        type (Literal["tool_call"]): Block type discriminator.
        id (str): Tool call ID.
        name (str): Tool name.
        args (dict[str, Any]): Arguments for the tool call.
        tool_type (str | None): Type of tool (e.g., web_search, file_search).
    """

    type: Literal["tool_call"] = "tool_call"
    id: str
    name: str
    args: dict[str, Any] = Field(default_factory=dict)
    tool_type: str | None = None  # e.g., web_search, file_search, computer_use
Attributes
args class-attribute instance-attribute
args = Field(default_factory=dict)
id instance-attribute
id
name instance-attribute
name
tool_type class-attribute instance-attribute
tool_type = None
type class-attribute instance-attribute
type = 'tool_call'
ToolResultBlock

Bases: BaseModel

Tool result content block for messages.

Attributes:

Name Type Description
type Literal['tool_result']

Block type discriminator.

call_id str

Tool call ID.

output Any

Output from the tool (str, dict, MediaRef, or list of blocks).

is_error bool

Whether the result is an error.

status Literal['completed', 'failed'] | None

Status of the tool call.

Source code in agentflow/state/message_block.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class ToolResultBlock(BaseModel):
    """
    Tool result content block for messages.

    Attributes:
        type (Literal["tool_result"]): Block type discriminator.
        call_id (str): Tool call ID.
        output (Any): Output from the tool (str, dict, MediaRef, or list of blocks).
        is_error (bool): Whether the result is an error.
        status (Literal["completed", "failed"] | None): Status of the tool call.
    """

    type: Literal["tool_result"] = "tool_result"
    call_id: str
    output: Any = None  # str | dict | MediaRef | list[ContentBlock-like]
    is_error: bool = False
    status: Literal["completed", "failed"] | None = None
Attributes
call_id instance-attribute
call_id
is_error class-attribute instance-attribute
is_error = False
output class-attribute instance-attribute
output = None
status class-attribute instance-attribute
status = None
type class-attribute instance-attribute
type = 'tool_result'
VideoBlock

Bases: BaseModel

Video content block for messages.

Attributes:

Name Type Description
type Literal['video']

Block type discriminator.

media MediaRef

Reference to video media.

thumbnail MediaRef | None

Reference to thumbnail image.

Source code in agentflow/state/message_block.py
112
113
114
115
116
117
118
119
120
121
122
123
124
class VideoBlock(BaseModel):
    """
    Video content block for messages.

    Attributes:
        type (Literal["video"]): Block type discriminator.
        media (MediaRef): Reference to video media.
        thumbnail (MediaRef | None): Reference to thumbnail image.
    """

    type: Literal["video"] = "video"
    media: MediaRef
    thumbnail: MediaRef | None = None
Attributes
media instance-attribute
media
thumbnail class-attribute instance-attribute
thumbnail = None
type class-attribute instance-attribute
type = 'video'

message_context_manager

Message context management for agent state in TAF.

This module provides MessageContextManager, which trims and manages the message history (context) for agent interactions, ensuring efficient context window usage.

Classes:

Name Description
MessageContextManager

Manages the context field for AI interactions.

Attributes:

Name Type Description
S
logger

Attributes

S module-attribute
S = TypeVar('S', bound=AgentState)
logger module-attribute
logger = getLogger(__name__)

Classes

MessageContextManager

Bases: BaseContextManager[S]

Manages the context field for AI interactions.

This class trims the context (message history) based on a maximum number of user messages, ensuring the first message (usually a system prompt) is always preserved. Optionally removes tool-related messages (AI messages with tool calls and tool result messages). Generic over AgentState or its subclasses.

Methods:

Name Description
__init__

Initialize the MessageContextManager.

atrim_context

Asynchronous version of trim_context.

trim_context

Trim the context in the given AgentState based on the maximum number of user messages.

Attributes:

Name Type Description
max_messages
remove_tool_msgs
Source code in agentflow/state/message_context_manager.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class MessageContextManager(BaseContextManager[S]):
    """
    Manages the context field for AI interactions.

    This class trims the context (message history) based on a maximum number of user messages,
    ensuring the first message (usually a system prompt) is always preserved.
    Optionally removes tool-related messages (AI messages with tool calls and tool result messages).
    Generic over AgentState or its subclasses.
    """

    def __init__(self, max_messages: int = 10, remove_tool_msgs: bool = False) -> None:
        """
        Initialize the MessageContextManager.

        Args:
            max_messages (int): Maximum number of
                user messages to keep in context. Default is 10.
            remove_tool_msgs (bool): Whether to remove tool messages from context.
                Default is False.
        """
        self.max_messages = max_messages
        self.remove_tool_msgs = remove_tool_msgs
        logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)

    def _trim(self, messages: list[Message]) -> list[Message] | None:
        """
        Trim messages keeping system messages and most recent user messages.

        Returns None if no trimming is needed, otherwise returns the trimmed list.
        """
        # check context is empty
        if not messages:
            logger.debug("No messages to trim; context is empty")
            return None

        # First, remove tool messages if requested
        if self.remove_tool_msgs:
            messages = remove_tool_messages(messages)
            logger.debug("Removed tool messages, %d messages remaining", len(messages))

        # Count user messages
        user_message_count = sum(1 for msg in messages if msg.role == "user")

        if user_message_count <= self.max_messages:
            # Check if we removed tool messages but no trimming needed
            if self.remove_tool_msgs:
                # Return the filtered messages even if count is within limits
                return messages
            # no trimming needed
            logger.debug(
                "No trimming needed; context is within limits (%d user messages)",
                user_message_count,
            )
            return None

        # Separate system messages (usually at the beginning)
        system_messages = [msg for msg in messages if msg.role == "system"]
        non_system_messages = [msg for msg in messages if msg.role != "system"]

        # Find the index of the oldest user message to keep
        user_count = 0
        start_index = len(non_system_messages)

        # Iterate from the end to find the position to start keeping messages
        for i in range(len(non_system_messages) - 1, -1, -1):
            msg = non_system_messages[i]
            if msg.role == "user":
                user_count += 1
                if user_count == self.max_messages:
                    start_index = i
                    break

        # Keep messages from start_index onwards
        final_non_system = non_system_messages[start_index:]

        # Combine system messages (at start) with trimmed conversation
        trimmed_messages = system_messages + final_non_system

        logger.debug(
            "Trimmed from %d to %d messages (%d user messages kept)",
            len(messages),
            len(trimmed_messages),
            self.max_messages,
        )

        return trimmed_messages

    def trim_context(self, state: S) -> S:
        """
        Trim the context in the given AgentState based on the maximum number of user messages.

        The first message (typically a system prompt) is always preserved. Only the most recent
        user messages up to `max_messages` are kept, along with the first message.

        If `remove_tool_msgs` is True, also removes:
        - AI messages that contain tool calls (intermediate tool-calling messages)
        - Tool result messages (role="tool")

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state

    async def atrim_context(self, state: S) -> S:
        """
        Asynchronous version of trim_context.

        If `remove_tool_msgs` is True, also removes:
        - AI messages that contain tool calls (intermediate tool-calling messages)
        - Tool result messages (role="tool")

        Args:
            state (AgentState): The agent state containing the context to trim.

        Returns:
            S: The updated agent state with trimmed context.
        """
        messages = state.context
        trimmed_messages = self._trim(messages)
        if trimmed_messages is not None:
            state.context = trimmed_messages
        return state
Attributes
max_messages instance-attribute
max_messages = max_messages
remove_tool_msgs instance-attribute
remove_tool_msgs = remove_tool_msgs
Functions
__init__
__init__(max_messages=10, remove_tool_msgs=False)

Initialize the MessageContextManager.

Parameters:

Name Type Description Default
max_messages int

Maximum number of user messages to keep in context. Default is 10.

10
remove_tool_msgs bool

Whether to remove tool messages from context. Default is False.

False
Source code in agentflow/state/message_context_manager.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(self, max_messages: int = 10, remove_tool_msgs: bool = False) -> None:
    """
    Initialize the MessageContextManager.

    Args:
        max_messages (int): Maximum number of
            user messages to keep in context. Default is 10.
        remove_tool_msgs (bool): Whether to remove tool messages from context.
            Default is False.
    """
    self.max_messages = max_messages
    self.remove_tool_msgs = remove_tool_msgs
    logger.debug("Initialized MessageContextManager with max_messages=%d", max_messages)
atrim_context async
atrim_context(state)

Asynchronous version of trim_context.

If remove_tool_msgs is True, also removes: - AI messages that contain tool calls (intermediate tool-calling messages) - Tool result messages (role="tool")

Parameters:

Name Type Description Default
state AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in agentflow/state/message_context_manager.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def atrim_context(self, state: S) -> S:
    """
    Asynchronous version of trim_context.

    If `remove_tool_msgs` is True, also removes:
    - AI messages that contain tool calls (intermediate tool-calling messages)
    - Tool result messages (role="tool")

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state
trim_context
trim_context(state)

Trim the context in the given AgentState based on the maximum number of user messages.

The first message (typically a system prompt) is always preserved. Only the most recent user messages up to max_messages are kept, along with the first message.

If remove_tool_msgs is True, also removes: - AI messages that contain tool calls (intermediate tool-calling messages) - Tool result messages (role="tool")

Parameters:

Name Type Description Default
state AgentState

The agent state containing the context to trim.

required

Returns:

Name Type Description
S S

The updated agent state with trimmed context.

Source code in agentflow/state/message_context_manager.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def trim_context(self, state: S) -> S:
    """
    Trim the context in the given AgentState based on the maximum number of user messages.

    The first message (typically a system prompt) is always preserved. Only the most recent
    user messages up to `max_messages` are kept, along with the first message.

    If `remove_tool_msgs` is True, also removes:
    - AI messages that contain tool calls (intermediate tool-calling messages)
    - Tool result messages (role="tool")

    Args:
        state (AgentState): The agent state containing the context to trim.

    Returns:
        S: The updated agent state with trimmed context.
    """
    messages = state.context
    trimmed_messages = self._trim(messages)
    if trimmed_messages is not None:
        state.context = trimmed_messages
    return state

Functions

reducers

Reducer utilities for merging and replacing lists and values in agent state.

This module provides generic and message-specific reducers for combining lists, replacing values, and appending items while avoiding duplicates.

Functions:

Name Description
add_messages

Adds messages to a list, avoiding duplicates by message_id.

replace_messages

Replaces the entire message list.

append_items

Appends items to a list, avoiding duplicates by id.

replace_value

Replaces a value with another.

Classes

Functions

add_messages
add_messages(left, right)

Adds messages to the list, avoiding duplicates by message_id.

Parameters:

Name Type Description Default
left
list[Message]

Existing list of messages.

required
right
list[Message]

New messages to add.

required

Returns:

Type Description
list[Message]

list[Message]: Combined list with unique messages.

Example

add_messages([msg1], [msg2, msg1]) [msg1, msg2]

Source code in agentflow/state/reducers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def add_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Adds messages to the list, avoiding duplicates by message_id.

    Args:
        left (list[Message]): Existing list of messages.
        right (list[Message]): New messages to add.

    Returns:
        list[Message]: Combined list with unique messages.

    Example:
        >>> add_messages([msg1], [msg2, msg1])
        [msg1, msg2]
    """
    left_ids = {msg.message_id for msg in left}
    right = [msg for msg in right if msg.message_id not in left_ids and msg.delta is False]
    return left + right
append_items
append_items(left, right)

Appends items to a list, avoiding duplicates by item.id.

Parameters:

Name Type Description Default
left
list

Existing list of items (must have .id attribute).

required
right
list

New items to add.

required

Returns:

Name Type Description
list list

Combined list with unique items.

Example

append_items([item1], [item2, item1]) [item1, item2]

Source code in agentflow/state/reducers.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def append_items(left: list, right: list) -> list:
    """
    Appends items to a list, avoiding duplicates by item.id.

    Args:
        left (list): Existing list of items (must have .id attribute).
        right (list): New items to add.

    Returns:
        list: Combined list with unique items.

    Example:
        >>> append_items([item1], [item2, item1])
        [item1, item2]
    """
    left_ids = {item.id for item in left}
    right = [item for item in right if item.id not in left_ids]
    return left + right
remove_tool_messages
remove_tool_messages(messages)

Remove COMPLETED tool interaction sequences from the message list.

A tool sequence is only removed if it's COMPLETE: 1. AI message with tool_calls (triggering tools) 2. One or more tool result messages (role="tool") 3. AI message WITHOUT tool_calls (final response using tool results)

If a sequence is incomplete (e.g., tool call made but no final AI response yet), ALL messages are kept to maintain conversation continuity.

Edge cases handled: - Incomplete sequences (AI called tool, waiting for results): Keep everything - Partial sequences (AI called tool, got results, but no final response): Keep everything - Multiple tool calls in one AI message: Handles correctly - Consecutive tool sequences: Each evaluated independently

Parameters:

Name Type Description Default
messages
list[Message]

List of messages to filter.

required

Returns:

Type Description
list[Message]

list[Message]: Filtered list with only COMPLETED tool sequences removed.

Example

Complete sequence (will be cleaned):

messages = [user_msg, ai_with_tools, tool_result, ai_final] remove_tool_messages(messages) [user_msg, ai_final]

Incomplete sequence (will be kept):

messages = [user_msg, ai_with_tools] remove_tool_messages(messages) [user_msg, ai_with_tools] # Keep everything - sequence incomplete!

Partial sequence (will be kept):

messages = [user_msg, ai_with_tools, tool_result] remove_tool_messages(messages) [user_msg, ai_with_tools, tool_result] # Keep - no final AI response!

Source code in agentflow/state/reducers.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def remove_tool_messages(messages: list[Message]) -> list[Message]:
    """
    Remove COMPLETED tool interaction sequences from the message list.

    A tool sequence is only removed if it's COMPLETE:
    1. AI message with tool_calls (triggering tools)
    2. One or more tool result messages (role="tool")
    3. AI message WITHOUT tool_calls (final response using tool results)

    If a sequence is incomplete (e.g., tool call made but no final AI response yet),
    ALL messages are kept to maintain conversation continuity.

    Edge cases handled:
    - Incomplete sequences (AI called tool, waiting for results): Keep everything
    - Partial sequences (AI called tool, got results, but no final response): Keep everything
    - Multiple tool calls in one AI message: Handles correctly
    - Consecutive tool sequences: Each evaluated independently

    Args:
        messages (list[Message]): List of messages to filter.

    Returns:
        list[Message]: Filtered list with only COMPLETED tool sequences removed.

    Example:
        Complete sequence (will be cleaned):
        >>> messages = [user_msg, ai_with_tools, tool_result, ai_final]
        >>> remove_tool_messages(messages)
        [user_msg, ai_final]

        Incomplete sequence (will be kept):
        >>> messages = [user_msg, ai_with_tools]
        >>> remove_tool_messages(messages)
        [user_msg, ai_with_tools]  # Keep everything - sequence incomplete!

        Partial sequence (will be kept):
        >>> messages = [user_msg, ai_with_tools, tool_result]
        >>> remove_tool_messages(messages)
        [user_msg, ai_with_tools, tool_result]  # Keep - no final AI response!
    """
    if not messages:
        return messages

    # Step 1: Identify indices to remove by scanning for COMPLETE sequences
    indices_to_remove = set()
    i = 0

    while i < len(messages):
        msg = messages[i]

        # Look for AI message with tool calls (potential sequence start)
        if msg.role == "assistant" and msg.tools_calls:
            sequence_start = i
            i += 1

            # Collect all following tool result messages
            tool_result_indices = []
            while i < len(messages) and messages[i].role == "tool":
                tool_result_indices.append(i)
                i += 1

            # Check if there's a final AI response (without tool_calls)
            has_final_response = (
                i < len(messages)
                and messages[i].role == "assistant"
                and not messages[i].tools_calls
            )
            if has_final_response:
                # COMPLETE SEQUENCE FOUND!
                # Mark AI with tool_calls and all tool results for removal
                indices_to_remove.add(sequence_start)
                indices_to_remove.update(tool_result_indices)
                # Note: We keep the final AI response (index i)
                i += 1  # Move past the final AI response
            else:
                # INCOMPLETE SEQUENCE - keep everything
                # Don't add anything to indices_to_remove
                # i is already positioned correctly (at next message or end)
                pass
        else:
            i += 1

    # Step 2: Build filtered list excluding marked indices
    return [msg for idx, msg in enumerate(messages) if idx not in indices_to_remove]
replace_messages
replace_messages(left, right)

Replaces the entire message list with a new one.

Parameters:

Name Type Description Default
left
list[Message]

Existing list of messages (ignored).

required
right
list[Message]

New list of messages.

required

Returns:

Type Description
list[Message]

list[Message]: The new message list.

Example

replace_messages([msg1], [msg2]) [msg2]

Source code in agentflow/state/reducers.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def replace_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Replaces the entire message list with a new one.

    Args:
        left (list[Message]): Existing list of messages (ignored).
        right (list[Message]): New list of messages.

    Returns:
        list[Message]: The new message list.

    Example:
        >>> replace_messages([msg1], [msg2])
        [msg2]
    """
    return right
replace_value
replace_value(left, right)

Replaces a value with another.

Parameters:

Name Type Description Default
left

Existing value (ignored).

required
right

New value to use.

required

Returns:

Name Type Description
Any

The new value.

Example

replace_value(1, 2) 2

Source code in agentflow/state/reducers.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def replace_value(left, right):
    """
    Replaces a value with another.

    Args:
        left: Existing value (ignored).
        right: New value to use.

    Returns:
        Any: The new value.

    Example:
        >>> replace_value(1, 2)
        2
    """
    return right

stream_chunks

Stream chunk primitives for unified streaming data handling.

This module provides a unified StreamChunk class that can encapsulate different types of streaming data (Messages, EventModels, etc.) in a type-safe manner. This enables clean separation between conversation content and execution state while providing a consistent interface for streaming consumers.

Classes:

Name Description
StreamChunk

Unified wrapper for streaming data with type discrimination.

Classes

StreamChunk

Bases: BaseModel

Unified wrapper for different types of streaming data.

This class provides a single interface for handling various streaming chunk types (messages, events, state updates, errors) with type-safe discrimination.

Attributes:

Name Type Description
type

The type of streaming chunk.

data dict | None

The actual chunk data (Message, EventModel, dict, etc.).

metadata dict | None

Optional additional metadata for the chunk.

Classes:

Name Description
Config

Pydantic configuration for EventModel.

Source code in agentflow/state/stream_chunks.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class StreamChunk(BaseModel):
    """
    Unified wrapper for different types of streaming data.

    This class provides a single interface for handling various streaming chunk types
    (messages, events, state updates, errors) with type-safe discrimination.

    Attributes:
        type: The type of streaming chunk.
        data: The actual chunk data (Message, EventModel, dict, etc.).
        metadata: Optional additional metadata for the chunk.
    """

    event: StreamEvent = StreamEvent.MESSAGE
    # data holders for different chunk types
    message: Message | None = None
    state: AgentState | None = None
    # Placeholder for other chunk types
    data: dict | None = None

    # Optional identifiers
    thread_id: str | None = None
    run_id: str | None = None
    # Optional metadata
    metadata: dict | None = None
    timestamp: float = Field(
        default_factory=datetime.now().timestamp,
        description="UNIX timestamp of when chunk was created",
    )

    class Config:
        """Pydantic configuration for EventModel.

        Attributes:
            use_enum_values: Output enums as strings.
        """

        use_enum_values = True
Attributes
data class-attribute instance-attribute
data = None
event class-attribute instance-attribute
event = MESSAGE
message class-attribute instance-attribute
message = None
metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
state class-attribute instance-attribute
state = None
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=timestamp, description='UNIX timestamp of when chunk was created')
Classes
Config

Pydantic configuration for EventModel.

Attributes:

Name Type Description
use_enum_values

Output enums as strings.

Source code in agentflow/state/stream_chunks.py
59
60
61
62
63
64
65
66
class Config:
    """Pydantic configuration for EventModel.

    Attributes:
        use_enum_values: Output enums as strings.
    """

    use_enum_values = True
Attributes
use_enum_values class-attribute instance-attribute
use_enum_values = True
StreamEvent

Bases: str, Enum

Attributes:

Name Type Description
ERROR
MESSAGE
STATE
UPDATES
Source code in agentflow/state/stream_chunks.py
22
23
24
25
26
class StreamEvent(str, enum.Enum):
    STATE = "state"
    MESSAGE = "message"
    ERROR = "error"
    UPDATES = "updates"
Attributes
ERROR class-attribute instance-attribute
ERROR = 'error'
MESSAGE class-attribute instance-attribute
MESSAGE = 'message'
STATE class-attribute instance-attribute
STATE = 'state'
UPDATES class-attribute instance-attribute
UPDATES = 'updates'