Skip to content

Utils

Unified utility exports for PyAgenity agent graphs.

This module re-exports core utility symbols for agent graph construction, message handling, callback management, reducers, and constants. Import from this module for a stable, unified surface of agent utilities.

Main Exports
  • Message and content blocks (Message, TextBlock, ToolCallBlock, etc.)
  • Callback management (CallbackManager, register_before_invoke, etc.)
  • Command and callable utilities (Command, call_sync_or_async)
  • Reducers (add_messages, replace_messages, append_items, replace_value)
  • Constants (START, END, ExecutionState, etc.)
  • Converter (convert_messages)

Modules:

Name Description
background_task_manager

Background task manager for async operations in PyAgenity.

callable_utils

Utilities for calling sync or async functions in PyAgenity.

callbacks

Callback system for PyAgenity.

command

Command API for AgentGraph in PyAgenity.

constants

Constants and enums for PyAgenity agent graph execution and messaging.

converter

Message conversion utilities for PyAgenity agent graphs.

id_generator

ID Generator Module

logging

Centralized logging configuration for PyAgenity.

message

Message and content block primitives for agent graphs.

metrics

Lightweight metrics instrumentation utilities.

reducers

Reducer utilities for merging and replacing lists and values in agent state.

thread_info

Thread metadata and status tracking for agent graphs.

thread_name_generator

Thread name generation utilities for AI agent conversations.

Classes:

Name Description
AfterInvokeCallback

Abstract base class for after_invoke callbacks.

AnnotationBlock

Annotation content block for messages.

BeforeInvokeCallback

Abstract base class for before_invoke callbacks.

CallbackContext

Context information passed to callbacks.

CallbackManager

Manages registration and execution of callbacks for different invocation types.

Command

Command object that combines state updates with control flow.

DataBlock

Data content block for messages.

ErrorBlock

Error content block for messages.

ExecutionState

Graph execution states for agent workflows.

InvocationType

Types of invocations that can trigger callbacks.

Message

Represents a message in a conversation, including content, role, metadata, and token usage.

OnErrorCallback

Abstract base class for on_error callbacks.

ReasoningBlock

Reasoning content block for messages.

ResponseGranularity

Response granularity options for agent graph outputs.

StorageLevel

Message storage levels for agent state persistence.

TextBlock

Text content block for messages.

ThreadInfo

Metadata and status for a thread in agent execution.

TokenUsages

Tracks token usage statistics for a message or model response.

ToolCallBlock

Tool call content block for messages.

ToolResultBlock

Tool result content block for messages.

Functions:

Name Description
add_messages

Adds messages to the list, avoiding duplicates by message_id.

append_items

Appends items to a list, avoiding duplicates by item.id.

call_sync_or_async

Call a function that may be sync or async, returning its result.

convert_messages

Convert system prompts, agent state, and extra messages to a list of dicts for

register_after_invoke

Register an after_invoke callback on the global callback manager.

register_before_invoke

Register a before_invoke callback on the global callback manager.

register_on_error

Register an on_error callback on the global callback manager.

replace_messages

Replaces the entire message list with a new one.

replace_value

Replaces a value with another.

run_coroutine

Run an async coroutine from a sync context safely.

Attributes:

Name Type Description
ContentBlock
END Literal['__end__']
START Literal['__start__']
default_callback_manager

Attributes

ContentBlock module-attribute

ContentBlock = Annotated[Union[TextBlock, ImageBlock, AudioBlock, VideoBlock, DocumentBlock, DataBlock, ToolCallBlock, ToolResultBlock, ReasoningBlock, AnnotationBlock, ErrorBlock], Field(discriminator='type')]

END module-attribute

END = '__end__'

START module-attribute

START = '__start__'

__all__ module-attribute

__all__ = ['END', 'START', 'AfterInvokeCallback', 'AnnotationBlock', 'BeforeInvokeCallback', 'CallbackContext', 'CallbackManager', 'Command', 'ContentBlock', 'DataBlock', 'ErrorBlock', 'ExecutionState', 'InvocationType', 'Message', 'OnErrorCallback', 'ReasoningBlock', 'ResponseGranularity', 'StorageLevel', 'TextBlock', 'ThreadInfo', 'TokenUsages', 'ToolCallBlock', 'ToolResultBlock', 'add_messages', 'append_items', 'call_sync_or_async', 'convert_messages', 'default_callback_manager', 'register_after_invoke', 'register_before_invoke', 'register_on_error', 'replace_messages', 'replace_value', 'run_coroutine']

default_callback_manager module-attribute

default_callback_manager = CallbackManager()

Classes

AfterInvokeCallback

Bases: ABC

Abstract base class for after_invoke callbacks.

Called after the AI model, tool, or MCP function is invoked. Allows for output validation and modification.

Methods:

Name Description
__call__

Execute the after_invoke callback.

Source code in pyagenity/utils/callbacks.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class AfterInvokeCallback[T, R](ABC):
    """Abstract base class for after_invoke callbacks.

    Called after the AI model, tool, or MCP function is invoked.
    Allows for output validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
        """Execute the after_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The original input data that was sent
            output_data: The output data returned from the invocation

        Returns:
            Modified output data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...

Functions

__call__ abstractmethod async
__call__(context, input_data, output_data)

Execute the after_invoke callback.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation

required
input_data
T

The original input data that was sent

required
output_data
Any

The output data returned from the invocation

required

Returns:

Type Description
Any | R

Modified output data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in pyagenity/utils/callbacks.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
    """Execute the after_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The original input data that was sent
        output_data: The output data returned from the invocation

    Returns:
        Modified output data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...

AnnotationBlock

Bases: BaseModel

Annotation content block for messages.

Attributes:

Name Type Description
type Literal['annotation']

Block type discriminator.

kind Literal['citation', 'note']

Kind of annotation.

refs list[AnnotationRef]

List of annotation references.

spans list[tuple[int, int]] | None

Spans covered by the annotation.

Source code in pyagenity/utils/message.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class AnnotationBlock(BaseModel):
    """
    Annotation content block for messages.

    Attributes:
        type (Literal["annotation"]): Block type discriminator.
        kind (Literal["citation", "note"]): Kind of annotation.
        refs (list[AnnotationRef]): List of annotation references.
        spans (list[tuple[int, int]] | None): Spans covered by the annotation.
    """

    type: Literal["annotation"] = "annotation"
    kind: Literal["citation", "note"] = "citation"
    refs: list[AnnotationRef] = Field(default_factory=list)
    spans: list[tuple[int, int]] | None = None

Attributes

kind class-attribute instance-attribute
kind = 'citation'
refs class-attribute instance-attribute
refs = Field(default_factory=list)
spans class-attribute instance-attribute
spans = None
type class-attribute instance-attribute
type = 'annotation'

BeforeInvokeCallback

Bases: ABC

Abstract base class for before_invoke callbacks.

Called before the AI model, tool, or MCP function is invoked. Allows for input validation and modification.

Methods:

Name Description
__call__

Execute the before_invoke callback.

Source code in pyagenity/utils/callbacks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class BeforeInvokeCallback[T, R](ABC):
    """Abstract base class for before_invoke callbacks.

    Called before the AI model, tool, or MCP function is invoked.
    Allows for input validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
        """Execute the before_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The input data about to be sent to the invocation

        Returns:
            Modified input data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...

Functions

__call__ abstractmethod async
__call__(context, input_data)

Execute the before_invoke callback.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation

required
input_data
T

The input data about to be sent to the invocation

required

Returns:

Type Description
T | R

Modified input data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in pyagenity/utils/callbacks.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
    """Execute the before_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The input data about to be sent to the invocation

    Returns:
        Modified input data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...

CallbackContext dataclass

Context information passed to callbacks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
function_name str | None
invocation_type InvocationType
metadata dict[str, Any] | None
node_name str
Source code in pyagenity/utils/callbacks.py
36
37
38
39
40
41
42
43
@dataclass
class CallbackContext:
    """Context information passed to callbacks."""

    invocation_type: InvocationType
    node_name: str
    function_name: str | None = None
    metadata: dict[str, Any] | None = None

Attributes

function_name class-attribute instance-attribute
function_name = None
invocation_type instance-attribute
invocation_type
metadata class-attribute instance-attribute
metadata = None
node_name instance-attribute
node_name

Functions

__init__
__init__(invocation_type, node_name, function_name=None, metadata=None)

CallbackManager

Manages registration and execution of callbacks for different invocation types.

Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.

Methods:

Name Description
__init__

Initialize the CallbackManager with empty callback registries.

clear_callbacks

Clear callbacks for a specific invocation type or all types.

execute_after_invoke

Execute all after_invoke callbacks for the given context.

execute_before_invoke

Execute all before_invoke callbacks for the given context.

execute_on_error

Execute all on_error callbacks for the given context.

get_callback_counts

Get count of registered callbacks by type for debugging.

register_after_invoke

Register an after_invoke callback for a specific invocation type.

register_before_invoke

Register a before_invoke callback for a specific invocation type.

register_on_error

Register an on_error callback for a specific invocation type.

Source code in pyagenity/utils/callbacks.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class CallbackManager:
    """
    Manages registration and execution of callbacks for different invocation types.

    Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.
    """

    def __init__(self):
        """
        Initialize the CallbackManager with empty callback registries.
        """
        self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }

    def register_before_invoke(
        self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
    ) -> None:
        """
        Register a before_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (BeforeInvokeCallbackType): The callback to register.
        """
        self._before_callbacks[invocation_type].append(callback)

    def register_after_invoke(
        self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
    ) -> None:
        """
        Register an after_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (AfterInvokeCallbackType): The callback to register.
        """
        self._after_callbacks[invocation_type].append(callback)

    def register_on_error(
        self, invocation_type: InvocationType, callback: OnErrorCallbackType
    ) -> None:
        """
        Register an on_error callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (OnErrorCallbackType): The callback to register.
        """
        self._error_callbacks[invocation_type].append(callback)

    async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
        """
        Execute all before_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data to be validated or modified.

        Returns:
            Any: The modified input data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_data = input_data

        for callback in self._before_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, BeforeInvokeCallback):
                    current_data = await callback(context, current_data)
                elif callable(callback):
                    result = callback(context, current_data)
                    if hasattr(result, "__await__"):
                        current_data = await result
                    else:
                        current_data = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_data

    async def execute_after_invoke(
        self, context: CallbackContext, input_data: Any, output_data: Any
    ) -> Any:
        """
        Execute all after_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The original input data sent to the invocation.
            output_data (Any): The output data returned from the invocation.

        Returns:
            Any: The modified output data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_output = output_data

        for callback in self._after_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, AfterInvokeCallback):
                    current_output = await callback(context, input_data, current_output)
                elif callable(callback):
                    result = callback(context, input_data, current_output)
                    if hasattr(result, "__await__"):
                        current_output = await result
                    else:
                        current_output = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_output

    async def execute_on_error(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Message | None:
        """
        Execute all on_error callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data that caused the error.
            error (Exception): The exception that occurred.

        Returns:
            Message | None: Recovery value from callbacks, or None if not handled.
        """
        recovery_value = None

        for callback in self._error_callbacks[context.invocation_type]:
            try:
                result = None
                if isinstance(callback, OnErrorCallback):
                    result = await callback(context, input_data, error)
                elif callable(callback):
                    result = callback(context, input_data, error)
                    if hasattr(result, "__await__"):
                        result = await result  # type: ignore

                if isinstance(result, Message) or result is None:
                    recovery_value = result
            except Exception as exc:
                logger.exception("Error callback failed: %s", exc)
                continue

        return recovery_value

    def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
        """
        Clear callbacks for a specific invocation type or all types.

        Args:
            invocation_type (InvocationType | None): The invocation type to clear, or None for all.
        """
        if invocation_type:
            self._before_callbacks[invocation_type].clear()
            self._after_callbacks[invocation_type].clear()
            self._error_callbacks[invocation_type].clear()
        else:
            for inv_type in InvocationType:
                self._before_callbacks[inv_type].clear()
                self._after_callbacks[inv_type].clear()
                self._error_callbacks[inv_type].clear()

    def get_callback_counts(self) -> dict[str, dict[str, int]]:
        """
        Get count of registered callbacks by type for debugging.

        Returns:
            dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
        """
        return {
            inv_type.value: {
                "before_invoke": len(self._before_callbacks[inv_type]),
                "after_invoke": len(self._after_callbacks[inv_type]),
                "on_error": len(self._error_callbacks[inv_type]),
            }
            for inv_type in InvocationType
        }

Functions

__init__
__init__()

Initialize the CallbackManager with empty callback registries.

Source code in pyagenity/utils/callbacks.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(self):
    """
    Initialize the CallbackManager with empty callback registries.
    """
    self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
clear_callbacks
clear_callbacks(invocation_type=None)

Clear callbacks for a specific invocation type or all types.

Parameters:

Name Type Description Default
invocation_type
InvocationType | None

The invocation type to clear, or None for all.

None
Source code in pyagenity/utils/callbacks.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
    """
    Clear callbacks for a specific invocation type or all types.

    Args:
        invocation_type (InvocationType | None): The invocation type to clear, or None for all.
    """
    if invocation_type:
        self._before_callbacks[invocation_type].clear()
        self._after_callbacks[invocation_type].clear()
        self._error_callbacks[invocation_type].clear()
    else:
        for inv_type in InvocationType:
            self._before_callbacks[inv_type].clear()
            self._after_callbacks[inv_type].clear()
            self._error_callbacks[inv_type].clear()
execute_after_invoke async
execute_after_invoke(context, input_data, output_data)

Execute all after_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation.

required
input_data
Any

The original input data sent to the invocation.

required
output_data
Any

The output data returned from the invocation.

required

Returns:

Name Type Description
Any Any

The modified output data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in pyagenity/utils/callbacks.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def execute_after_invoke(
    self, context: CallbackContext, input_data: Any, output_data: Any
) -> Any:
    """
    Execute all after_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The original input data sent to the invocation.
        output_data (Any): The output data returned from the invocation.

    Returns:
        Any: The modified output data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_output = output_data

    for callback in self._after_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, AfterInvokeCallback):
                current_output = await callback(context, input_data, current_output)
            elif callable(callback):
                result = callback(context, input_data, current_output)
                if hasattr(result, "__await__"):
                    current_output = await result
                else:
                    current_output = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_output
execute_before_invoke async
execute_before_invoke(context, input_data)

Execute all before_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation.

required
input_data
Any

The input data to be validated or modified.

required

Returns:

Name Type Description
Any Any

The modified input data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in pyagenity/utils/callbacks.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
    """
    Execute all before_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data to be validated or modified.

    Returns:
        Any: The modified input data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_data = input_data

    for callback in self._before_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, BeforeInvokeCallback):
                current_data = await callback(context, current_data)
            elif callable(callback):
                result = callback(context, current_data)
                if hasattr(result, "__await__"):
                    current_data = await result
                else:
                    current_data = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_data
execute_on_error async
execute_on_error(context, input_data, error)

Execute all on_error callbacks for the given context.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation.

required
input_data
Any

The input data that caused the error.

required
error
Exception

The exception that occurred.

required

Returns:

Type Description
Message | None

Message | None: Recovery value from callbacks, or None if not handled.

Source code in pyagenity/utils/callbacks.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def execute_on_error(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Message | None:
    """
    Execute all on_error callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data that caused the error.
        error (Exception): The exception that occurred.

    Returns:
        Message | None: Recovery value from callbacks, or None if not handled.
    """
    recovery_value = None

    for callback in self._error_callbacks[context.invocation_type]:
        try:
            result = None
            if isinstance(callback, OnErrorCallback):
                result = await callback(context, input_data, error)
            elif callable(callback):
                result = callback(context, input_data, error)
                if hasattr(result, "__await__"):
                    result = await result  # type: ignore

            if isinstance(result, Message) or result is None:
                recovery_value = result
        except Exception as exc:
            logger.exception("Error callback failed: %s", exc)
            continue

    return recovery_value
get_callback_counts
get_callback_counts()

Get count of registered callbacks by type for debugging.

Returns:

Type Description
dict[str, dict[str, int]]

dict[str, dict[str, int]]: Counts of callbacks for each invocation type.

Source code in pyagenity/utils/callbacks.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_callback_counts(self) -> dict[str, dict[str, int]]:
    """
    Get count of registered callbacks by type for debugging.

    Returns:
        dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
    """
    return {
        inv_type.value: {
            "before_invoke": len(self._before_callbacks[inv_type]),
            "after_invoke": len(self._after_callbacks[inv_type]),
            "on_error": len(self._error_callbacks[inv_type]),
        }
        for inv_type in InvocationType
    }
register_after_invoke
register_after_invoke(invocation_type, callback)

Register an after_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
AfterInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
176
177
178
179
180
181
182
183
184
185
186
def register_after_invoke(
    self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    self._after_callbacks[invocation_type].append(callback)
register_before_invoke
register_before_invoke(invocation_type, callback)

Register a before_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
BeforeInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
164
165
166
167
168
169
170
171
172
173
174
def register_before_invoke(
    self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    self._before_callbacks[invocation_type].append(callback)
register_on_error
register_on_error(invocation_type, callback)

Register an on_error callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
OnErrorCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
188
189
190
191
192
193
194
195
196
197
198
def register_on_error(
    self, invocation_type: InvocationType, callback: OnErrorCallbackType
) -> None:
    """
    Register an on_error callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    self._error_callbacks[invocation_type].append(callback)

Command

Command object that combines state updates with control flow.

Allows nodes to update agent state and direct graph execution to specific nodes or graphs. Similar to LangGraph's Command API.

Methods:

Name Description
__init__

Initialize a Command object.

__repr__

Return a string representation of the Command object.

Attributes:

Name Type Description
PARENT
goto
graph
state
update
Source code in pyagenity/utils/command.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Command[StateT: AgentState]:
    """
    Command object that combines state updates with control flow.

    Allows nodes to update agent state and direct graph execution to specific nodes or graphs.
    Similar to LangGraph's Command API.
    """

    PARENT = "PARENT"

    def __init__(
        self,
        update: Union["StateT", None, Message, str, "BaseConverter"] = None,
        goto: str | None = None,
        graph: str | None = None,
        state: StateT | None = None,
    ):
        """
        Initialize a Command object.

        Args:
            update (StateT | None | Message | str | BaseConverter): State update to apply.
            goto (str | None): Next node to execute (node name or END).
            graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
            state (StateT | None): Optional agent state to attach.
        """
        self.update = update
        self.goto = goto
        self.graph = graph
        self.state = state

    def __repr__(self) -> str:
        """
        Return a string representation of the Command object.

        Returns:
            str: String representation of the Command.
        """
        return (
            f"Command(update={self.update}, goto={self.goto}, \n"
            f" graph={self.graph}, state={self.state})"
        )

Attributes

PARENT class-attribute instance-attribute
PARENT = 'PARENT'
goto instance-attribute
goto = goto
graph instance-attribute
graph = graph
state instance-attribute
state = state
update instance-attribute
update = update

Functions

__init__
__init__(update=None, goto=None, graph=None, state=None)

Initialize a Command object.

Parameters:

Name Type Description Default
update
StateT | None | Message | str | BaseConverter

State update to apply.

None
goto
str | None

Next node to execute (node name or END).

None
graph
str | None

Which graph to navigate to (None for current, PARENT for parent).

None
state
StateT | None

Optional agent state to attach.

None
Source code in pyagenity/utils/command.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    update: Union["StateT", None, Message, str, "BaseConverter"] = None,
    goto: str | None = None,
    graph: str | None = None,
    state: StateT | None = None,
):
    """
    Initialize a Command object.

    Args:
        update (StateT | None | Message | str | BaseConverter): State update to apply.
        goto (str | None): Next node to execute (node name or END).
        graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
        state (StateT | None): Optional agent state to attach.
    """
    self.update = update
    self.goto = goto
    self.graph = graph
    self.state = state
__repr__
__repr__()

Return a string representation of the Command object.

Returns:

Name Type Description
str str

String representation of the Command.

Source code in pyagenity/utils/command.py
54
55
56
57
58
59
60
61
62
63
64
def __repr__(self) -> str:
    """
    Return a string representation of the Command object.

    Returns:
        str: String representation of the Command.
    """
    return (
        f"Command(update={self.update}, goto={self.goto}, \n"
        f" graph={self.graph}, state={self.state})"
    )

DataBlock

Bases: BaseModel

Data content block for messages.

Attributes:

Name Type Description
type Literal['data']

Block type discriminator.

mime_type str

MIME type of the data.

data_base64 str | None

Base64-encoded data.

media MediaRef | None

Reference to associated media.

Source code in pyagenity/utils/message.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class DataBlock(BaseModel):
    """
    Data content block for messages.

    Attributes:
        type (Literal["data"]): Block type discriminator.
        mime_type (str): MIME type of the data.
        data_base64 (str | None): Base64-encoded data.
        media (MediaRef | None): Reference to associated media.
    """

    type: Literal["data"] = "data"
    mime_type: str
    data_base64: str | None = None
    media: MediaRef | None = None

Attributes

data_base64 class-attribute instance-attribute
data_base64 = None
media class-attribute instance-attribute
media = None
mime_type instance-attribute
mime_type
type class-attribute instance-attribute
type = 'data'

ErrorBlock

Bases: BaseModel

Error content block for messages.

Attributes:

Name Type Description
type Literal['error']

Block type discriminator.

message str

Error message.

code str | None

Error code.

data dict[str, Any] | None

Additional error data.

Source code in pyagenity/utils/message.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class ErrorBlock(BaseModel):
    """
    Error content block for messages.

    Attributes:
        type (Literal["error"]): Block type discriminator.
        message (str): Error message.
        code (str | None): Error code.
        data (dict[str, Any] | None): Additional error data.
    """

    type: Literal["error"] = "error"
    message: str
    code: str | None = None
    data: dict[str, Any] | None = None

Attributes

code class-attribute instance-attribute
code = None
data class-attribute instance-attribute
data = None
message instance-attribute
message
type class-attribute instance-attribute
type = 'error'

ExecutionState

Bases: StrEnum

Graph execution states for agent workflows.

Values

RUNNING: Execution is in progress. PAUSED: Execution is paused. COMPLETED: Execution completed successfully. ERROR: Execution encountered an error. INTERRUPTED: Execution was interrupted. ABORTED: Execution was aborted. IDLE: Execution is idle.

Attributes:

Name Type Description
ABORTED
COMPLETED
ERROR
IDLE
INTERRUPTED
PAUSED
RUNNING
Source code in pyagenity/utils/constants.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ExecutionState(StrEnum):
    """
    Graph execution states for agent workflows.

    Values:
        RUNNING: Execution is in progress.
        PAUSED: Execution is paused.
        COMPLETED: Execution completed successfully.
        ERROR: Execution encountered an error.
        INTERRUPTED: Execution was interrupted.
        ABORTED: Execution was aborted.
        IDLE: Execution is idle.
    """

    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    ERROR = "error"
    INTERRUPTED = "interrupted"
    ABORTED = "aborted"
    IDLE = "idle"

Attributes

ABORTED class-attribute instance-attribute
ABORTED = 'aborted'
COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
IDLE class-attribute instance-attribute
IDLE = 'idle'
INTERRUPTED class-attribute instance-attribute
INTERRUPTED = 'interrupted'
PAUSED class-attribute instance-attribute
PAUSED = 'paused'
RUNNING class-attribute instance-attribute
RUNNING = 'running'

InvocationType

Bases: Enum

Types of invocations that can trigger callbacks.

Attributes:

Name Type Description
AI
MCP
TOOL
Source code in pyagenity/utils/callbacks.py
28
29
30
31
32
33
class InvocationType(Enum):
    """Types of invocations that can trigger callbacks."""

    AI = "ai"
    TOOL = "tool"
    MCP = "mcp"

Attributes

AI class-attribute instance-attribute
AI = 'ai'
MCP class-attribute instance-attribute
MCP = 'mcp'
TOOL class-attribute instance-attribute
TOOL = 'tool'

Message

Bases: BaseModel

Represents a message in a conversation, including content, role, metadata, and token usage.

Attributes:

Name Type Description
message_id str | int

Unique identifier for the message.

role Literal['user', 'assistant', 'system', 'tool']

The role of the message sender.

content list[ContentBlock]

The message content blocks.

delta bool

Indicates if this is a delta/partial message.

tools_calls list[dict[str, Any]] | None

Tool call information, if any.

reasoning str | None

Reasoning or explanation, if any.

timestamp datetime | None

Timestamp of the message.

metadata dict[str, Any]

Additional metadata.

usages TokenUsages | None

Token usage statistics.

raw dict[str, Any] | None

Raw data, if any.

Example

msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])

Methods:

Name Description
attach_media

Append a media block to the content.

text

Best-effort text extraction from content blocks.

text_message

Create a Message instance from plain text.

tool_message

Create a tool message, optionally marking it as an error.

Source code in pyagenity/utils/message.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
class Message(BaseModel):
    """
    Represents a message in a conversation, including content, role, metadata, and token usage.

    Attributes:
        message_id (str | int): Unique identifier for the message.
        role (Literal["user", "assistant", "system", "tool"]): The role of the message sender.
        content (list[ContentBlock]): The message content blocks.
        delta (bool): Indicates if this is a delta/partial message.
        tools_calls (list[dict[str, Any]] | None): Tool call information, if any.
        reasoning (str | None): Reasoning or explanation, if any.
        timestamp (datetime | None): Timestamp of the message.
        metadata (dict[str, Any]): Additional metadata.
        usages (TokenUsages | None): Token usage statistics.
        raw (dict[str, Any] | None): Raw data, if any.

    Example:
        >>> msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])
        {'message_id': 'abc123', 'role': 'user', 'content': [...], ...}
    """

    message_id: str | int = Field(default_factory=lambda: generate_id(None))
    role: Literal["user", "assistant", "system", "tool"]
    content: list[ContentBlock]
    delta: bool = False  # Indicates if this is a delta/partial message
    tools_calls: list[dict[str, Any]] | None = None
    reasoning: str | None = None  # Remove it
    timestamp: datetime | None = Field(default_factory=datetime.now)
    metadata: dict[str, Any] = Field(default_factory=dict)
    usages: TokenUsages | None = None
    raw: dict[str, Any] | None = None

    @classmethod
    def text_message(
        cls,
        content: str,
        role: Literal["user", "assistant", "system", "tool"] = "user",
        message_id: str | None = None,
    ) -> "Message":
        """
        Create a Message instance from plain text.

        Args:
            content (str): The message content.
            role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
            message_id (str | None): Optional message ID.

        Returns:
            Message: The created Message instance.

        Example:
            >>> Message.text_message("Hello!", role="user")
        """
        logger.debug("Creating message from text with role: %s", role)
        return cls(
            message_id=generate_id(message_id),
            role=role,
            content=[TextBlock(text=content)],
            timestamp=datetime.now(),
            metadata={},
        )

    @classmethod
    def tool_message(
        cls,
        content: list[ContentBlock],
        message_id: str | None = None,
        meta: dict[str, Any] | None = None,
    ) -> "Message":
        """
        Create a tool message, optionally marking it as an error.

        Args:
            content (list[ContentBlock]): The message content blocks.
            message_id (str | None): Optional message ID.
            meta (dict[str, Any] | None): Optional metadata.

        Returns:
            Message: The created tool message instance.

        Example:
            >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
        """
        res = content
        msg_id = generate_id(message_id)
        return cls(
            message_id=msg_id,
            role="tool",
            content=res,
            timestamp=datetime.now(),
            metadata=meta or {},
        )

    # --- Convenience helpers ---
    def text(self) -> str:
        """
        Best-effort text extraction from content blocks.

        Returns:
            str: Concatenated text from TextBlock and ToolResultBlock outputs.

        Example:
            >>> msg.text()
            'Hello!Result text.'
        """
        parts: list[str] = []
        for block in self.content:
            if isinstance(block, TextBlock):
                parts.append(block.text)
            elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
                parts.append(block.output)
        return "".join(parts)

    def attach_media(
        self,
        media: MediaRef,
        as_type: Literal["image", "audio", "video", "document"],
    ) -> None:
        """
        Append a media block to the content.

        If content was text, creates a block list. Supports image, audio, video, and document types.

        Args:
            media (MediaRef): Reference to media content.
            as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

        Returns:
            None

        Raises:
            ValueError: If an unsupported media type is provided.

        Example:
            >>> msg.attach_media(media_ref, as_type="image")
        """
        block: ContentBlock
        if as_type == "image":
            block = ImageBlock(media=media)
        elif as_type == "audio":
            block = AudioBlock(media=media)
        elif as_type == "video":
            block = VideoBlock(media=media)
        elif as_type == "document":
            block = DocumentBlock(media=media)
        else:
            raise ValueError(f"Unsupported media type: {as_type}")

        if isinstance(self.content, str):
            self.content = [TextBlock(text=self.content), block]
        elif isinstance(self.content, list):
            self.content.append(block)
        else:
            self.content = [block]

Attributes

content instance-attribute
content
delta class-attribute instance-attribute
delta = False
message_id class-attribute instance-attribute
message_id = Field(default_factory=lambda: generate_id(None))
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
raw class-attribute instance-attribute
raw = None
reasoning class-attribute instance-attribute
reasoning = None
role instance-attribute
role
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=now)
tools_calls class-attribute instance-attribute
tools_calls = None
usages class-attribute instance-attribute
usages = None

Functions

attach_media
attach_media(media, as_type)

Append a media block to the content.

If content was text, creates a block list. Supports image, audio, video, and document types.

Parameters:

Name Type Description Default
media
MediaRef

Reference to media content.

required
as_type
Literal['image', 'audio', 'video', 'document']

Type of media block to append.

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If an unsupported media type is provided.

Example

msg.attach_media(media_ref, as_type="image")

Source code in pyagenity/utils/message.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def attach_media(
    self,
    media: MediaRef,
    as_type: Literal["image", "audio", "video", "document"],
) -> None:
    """
    Append a media block to the content.

    If content was text, creates a block list. Supports image, audio, video, and document types.

    Args:
        media (MediaRef): Reference to media content.
        as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

    Returns:
        None

    Raises:
        ValueError: If an unsupported media type is provided.

    Example:
        >>> msg.attach_media(media_ref, as_type="image")
    """
    block: ContentBlock
    if as_type == "image":
        block = ImageBlock(media=media)
    elif as_type == "audio":
        block = AudioBlock(media=media)
    elif as_type == "video":
        block = VideoBlock(media=media)
    elif as_type == "document":
        block = DocumentBlock(media=media)
    else:
        raise ValueError(f"Unsupported media type: {as_type}")

    if isinstance(self.content, str):
        self.content = [TextBlock(text=self.content), block]
    elif isinstance(self.content, list):
        self.content.append(block)
    else:
        self.content = [block]
text
text()

Best-effort text extraction from content blocks.

Returns:

Name Type Description
str str

Concatenated text from TextBlock and ToolResultBlock outputs.

Example

msg.text() 'Hello!Result text.'

Source code in pyagenity/utils/message.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def text(self) -> str:
    """
    Best-effort text extraction from content blocks.

    Returns:
        str: Concatenated text from TextBlock and ToolResultBlock outputs.

    Example:
        >>> msg.text()
        'Hello!Result text.'
    """
    parts: list[str] = []
    for block in self.content:
        if isinstance(block, TextBlock):
            parts.append(block.text)
        elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
            parts.append(block.output)
    return "".join(parts)
text_message classmethod
text_message(content, role='user', message_id=None)

Create a Message instance from plain text.

Parameters:

Name Type Description Default
content
str

The message content.

required
role
Literal['user', 'assistant', 'system', 'tool']

The role of the sender.

'user'
message_id
str | None

Optional message ID.

None

Returns:

Name Type Description
Message Message

The created Message instance.

Example

Message.text_message("Hello!", role="user")

Source code in pyagenity/utils/message.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
@classmethod
def text_message(
    cls,
    content: str,
    role: Literal["user", "assistant", "system", "tool"] = "user",
    message_id: str | None = None,
) -> "Message":
    """
    Create a Message instance from plain text.

    Args:
        content (str): The message content.
        role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
        message_id (str | None): Optional message ID.

    Returns:
        Message: The created Message instance.

    Example:
        >>> Message.text_message("Hello!", role="user")
    """
    logger.debug("Creating message from text with role: %s", role)
    return cls(
        message_id=generate_id(message_id),
        role=role,
        content=[TextBlock(text=content)],
        timestamp=datetime.now(),
        metadata={},
    )
tool_message classmethod
tool_message(content, message_id=None, meta=None)

Create a tool message, optionally marking it as an error.

Parameters:

Name Type Description Default
content
list[ContentBlock]

The message content blocks.

required
message_id
str | None

Optional message ID.

None
meta
dict[str, Any] | None

Optional metadata.

None

Returns:

Name Type Description
Message Message

The created tool message instance.

Example

Message.tool_message([ToolResultBlock(...)], message_id="tool1")

Source code in pyagenity/utils/message.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
@classmethod
def tool_message(
    cls,
    content: list[ContentBlock],
    message_id: str | None = None,
    meta: dict[str, Any] | None = None,
) -> "Message":
    """
    Create a tool message, optionally marking it as an error.

    Args:
        content (list[ContentBlock]): The message content blocks.
        message_id (str | None): Optional message ID.
        meta (dict[str, Any] | None): Optional metadata.

    Returns:
        Message: The created tool message instance.

    Example:
        >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
    """
    res = content
    msg_id = generate_id(message_id)
    return cls(
        message_id=msg_id,
        role="tool",
        content=res,
        timestamp=datetime.now(),
        metadata=meta or {},
    )

OnErrorCallback

Bases: ABC

Abstract base class for on_error callbacks.

Called when an error occurs during invocation. Allows for error handling and logging.

Methods:

Name Description
__call__

Execute the on_error callback.

Source code in pyagenity/utils/callbacks.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class OnErrorCallback(ABC):
    """Abstract base class for on_error callbacks.

    Called when an error occurs during invocation.
    Allows for error handling and logging.
    """

    @abstractmethod
    async def __call__(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Any | None:
        """Execute the on_error callback.

        Args:
            context: Context information about the invocation
            input_data: The input data that caused the error
            error: The exception that occurred

        Returns:
            Optional recovery value or None to re-raise the error

        Raises:
            Exception: If error handling fails or if the error should be re-raised
        """
        ...

Functions

__call__ abstractmethod async
__call__(context, input_data, error)

Execute the on_error callback.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation

required
input_data
Any

The input data that caused the error

required
error
Exception

The exception that occurred

required

Returns:

Type Description
Any | None

Optional recovery value or None to re-raise the error

Raises:

Type Description
Exception

If error handling fails or if the error should be re-raised

Source code in pyagenity/utils/callbacks.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@abstractmethod
async def __call__(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Any | None:
    """Execute the on_error callback.

    Args:
        context: Context information about the invocation
        input_data: The input data that caused the error
        error: The exception that occurred

    Returns:
        Optional recovery value or None to re-raise the error

    Raises:
        Exception: If error handling fails or if the error should be re-raised
    """
    ...

ReasoningBlock

Bases: BaseModel

Reasoning content block for messages.

Attributes:

Name Type Description
type Literal['reasoning']

Block type discriminator.

summary str

Summary of reasoning.

details list[str] | None

Detailed reasoning steps.

Source code in pyagenity/utils/message.py
314
315
316
317
318
319
320
321
322
323
324
325
326
class ReasoningBlock(BaseModel):
    """
    Reasoning content block for messages.

    Attributes:
        type (Literal["reasoning"]): Block type discriminator.
        summary (str): Summary of reasoning.
        details (list[str] | None): Detailed reasoning steps.
    """

    type: Literal["reasoning"] = "reasoning"
    summary: str
    details: list[str] | None = None

Attributes

details class-attribute instance-attribute
details = None
summary instance-attribute
summary
type class-attribute instance-attribute
type = 'reasoning'

ResponseGranularity

Bases: StrEnum

Response granularity options for agent graph outputs.

Values

FULL: State, latest messages. PARTIAL: Context, summary, latest messages. LOW: Only latest messages.

Attributes:

Name Type Description
FULL
LOW
PARTIAL
Source code in pyagenity/utils/constants.py
55
56
57
58
59
60
61
62
63
64
65
66
67
class ResponseGranularity(StrEnum):
    """
    Response granularity options for agent graph outputs.

    Values:
        FULL: State, latest messages.
        PARTIAL: Context, summary, latest messages.
        LOW: Only latest messages.
    """

    FULL = "full"
    PARTIAL = "partial"
    LOW = "low"

Attributes

FULL class-attribute instance-attribute
FULL = 'full'
LOW class-attribute instance-attribute
LOW = 'low'
PARTIAL class-attribute instance-attribute
PARTIAL = 'partial'

StorageLevel

Message storage levels for agent state persistence.

Attributes:

Name Type Description
ALL

Save everything including tool calls.

MEDIUM

Only AI and human messages.

LOW

Only first human and last AI message.

Source code in pyagenity/utils/constants.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class StorageLevel:
    """
    Message storage levels for agent state persistence.

    Attributes:
        ALL: Save everything including tool calls.
        MEDIUM: Only AI and human messages.
        LOW: Only first human and last AI message.
    """

    ALL = "all"
    MEDIUM = "medium"
    LOW = "low"

Attributes

ALL class-attribute instance-attribute
ALL = 'all'
LOW class-attribute instance-attribute
LOW = 'low'
MEDIUM class-attribute instance-attribute
MEDIUM = 'medium'

TextBlock

Bases: BaseModel

Text content block for messages.

Attributes:

Name Type Description
type Literal['text']

Block type discriminator.

text str

Text content.

annotations list[AnnotationRef]

List of annotation references.

Source code in pyagenity/utils/message.py
176
177
178
179
180
181
182
183
184
185
186
187
188
class TextBlock(BaseModel):
    """
    Text content block for messages.

    Attributes:
        type (Literal["text"]): Block type discriminator.
        text (str): Text content.
        annotations (list[AnnotationRef]): List of annotation references.
    """

    type: Literal["text"] = "text"
    text: str
    annotations: list[AnnotationRef] = Field(default_factory=list)

Attributes

annotations class-attribute instance-attribute
annotations = Field(default_factory=list)
text instance-attribute
text
type class-attribute instance-attribute
type = 'text'

ThreadInfo

Bases: BaseModel

Metadata and status for a thread in agent execution.

Attributes:

Name Type Description
thread_id int | str

Unique identifier for the thread.

thread_name str | None

Optional name for the thread.

user_id int | str | None

Optional user identifier associated with the thread.

metadata dict[str, Any] | None

Optional metadata for the thread.

updated_at datetime | None

Timestamp of last update.

stop_requested bool

Whether a stop has been requested for the thread.

run_id str | None

Optional run identifier for the thread execution.

Example

ThreadInfo(thread_id=1, thread_name="main", user_id=42)

Source code in pyagenity/utils/thread_info.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ThreadInfo(BaseModel):
    """
    Metadata and status for a thread in agent execution.

    Attributes:
        thread_id (int | str): Unique identifier for the thread.
        thread_name (str | None): Optional name for the thread.
        user_id (int | str | None): Optional user identifier associated with the thread.
        metadata (dict[str, Any] | None): Optional metadata for the thread.
        updated_at (datetime | None): Timestamp of last update.
        stop_requested (bool): Whether a stop has been requested for the thread.
        run_id (str | None): Optional run identifier for the thread execution.

    Example:
        >>> ThreadInfo(thread_id=1, thread_name="main", user_id=42)
    """

    thread_id: int | str
    thread_name: str | None = None
    user_id: int | str | None = None
    metadata: dict[str, Any] | None = None
    updated_at: datetime | None = None
    run_id: str | None = None

Attributes

metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
thread_id instance-attribute
thread_id
thread_name class-attribute instance-attribute
thread_name = None
updated_at class-attribute instance-attribute
updated_at = None
user_id class-attribute instance-attribute
user_id = None

TokenUsages

Bases: BaseModel

Tracks token usage statistics for a message or model response.

Attributes:

Name Type Description
completion_tokens int

Number of completion tokens used.

prompt_tokens int

Number of prompt tokens used.

total_tokens int

Total tokens used.

reasoning_tokens int

Reasoning tokens used (optional).

cache_creation_input_tokens int

Cache creation input tokens (optional).

cache_read_input_tokens int

Cache read input tokens (optional).

image_tokens int | None

Image tokens for multimodal models (optional).

audio_tokens int | None

Audio tokens for multimodal models (optional).

Example

usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)

Source code in pyagenity/utils/message.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class TokenUsages(BaseModel):
    """
    Tracks token usage statistics for a message or model response.

    Attributes:
        completion_tokens (int): Number of completion tokens used.
        prompt_tokens (int): Number of prompt tokens used.
        total_tokens (int): Total tokens used.
        reasoning_tokens (int): Reasoning tokens used (optional).
        cache_creation_input_tokens (int): Cache creation input tokens (optional).
        cache_read_input_tokens (int): Cache read input tokens (optional).
        image_tokens (int | None): Image tokens for multimodal models (optional).
        audio_tokens (int | None): Audio tokens for multimodal models (optional).

    Example:
        >>> usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)
        {'completion_tokens': 10, 'prompt_tokens': 20, 'total_tokens': 30, ...}
    """

    completion_tokens: int
    prompt_tokens: int
    total_tokens: int
    reasoning_tokens: int = 0
    cache_creation_input_tokens: int = 0
    cache_read_input_tokens: int = 0
    # Optional modality-specific usage fields for multimodal models
    image_tokens: int | None = 0
    audio_tokens: int | None = 0

Attributes

audio_tokens class-attribute instance-attribute
audio_tokens = 0
cache_creation_input_tokens class-attribute instance-attribute
cache_creation_input_tokens = 0
cache_read_input_tokens class-attribute instance-attribute
cache_read_input_tokens = 0
completion_tokens instance-attribute
completion_tokens
image_tokens class-attribute instance-attribute
image_tokens = 0
prompt_tokens instance-attribute
prompt_tokens
reasoning_tokens class-attribute instance-attribute
reasoning_tokens = 0
total_tokens instance-attribute
total_tokens

ToolCallBlock

Bases: BaseModel

Tool call content block for messages.

Attributes:

Name Type Description
type Literal['tool_call']

Block type discriminator.

id str

Tool call ID.

name str

Tool name.

args dict[str, Any]

Arguments for the tool call.

tool_type str | None

Type of tool (e.g., web_search, file_search).

Source code in pyagenity/utils/message.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class ToolCallBlock(BaseModel):
    """
    Tool call content block for messages.

    Attributes:
        type (Literal["tool_call"]): Block type discriminator.
        id (str): Tool call ID.
        name (str): Tool name.
        args (dict[str, Any]): Arguments for the tool call.
        tool_type (str | None): Type of tool (e.g., web_search, file_search).
    """

    type: Literal["tool_call"] = "tool_call"
    id: str
    name: str
    args: dict[str, Any] = Field(default_factory=dict)
    tool_type: str | None = None  # e.g., web_search, file_search, computer_use

Attributes

args class-attribute instance-attribute
args = Field(default_factory=dict)
id instance-attribute
id
name instance-attribute
name
tool_type class-attribute instance-attribute
tool_type = None
type class-attribute instance-attribute
type = 'tool_call'

ToolResultBlock

Bases: BaseModel

Tool result content block for messages.

Attributes:

Name Type Description
type Literal['tool_result']

Block type discriminator.

call_id str

Tool call ID.

output Any

Output from the tool (str, dict, MediaRef, or list of blocks).

is_error bool

Whether the result is an error.

status Literal['completed', 'failed'] | None

Status of the tool call.

Source code in pyagenity/utils/message.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class ToolResultBlock(BaseModel):
    """
    Tool result content block for messages.

    Attributes:
        type (Literal["tool_result"]): Block type discriminator.
        call_id (str): Tool call ID.
        output (Any): Output from the tool (str, dict, MediaRef, or list of blocks).
        is_error (bool): Whether the result is an error.
        status (Literal["completed", "failed"] | None): Status of the tool call.
    """

    type: Literal["tool_result"] = "tool_result"
    call_id: str
    output: Any = None  # str | dict | MediaRef | list[ContentBlock-like]
    is_error: bool = False
    status: Literal["completed", "failed"] | None = None

Attributes

call_id instance-attribute
call_id
is_error class-attribute instance-attribute
is_error = False
output class-attribute instance-attribute
output = None
status class-attribute instance-attribute
status = None
type class-attribute instance-attribute
type = 'tool_result'

Functions

add_messages

add_messages(left, right)

Adds messages to the list, avoiding duplicates by message_id.

Parameters:

Name Type Description Default

left

list[Message]

Existing list of messages.

required

right

list[Message]

New messages to add.

required

Returns:

Type Description
list[Message]

list[Message]: Combined list with unique messages.

Example

add_messages([msg1], [msg2, msg1]) [msg1, msg2]

Source code in pyagenity/utils/reducers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def add_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Adds messages to the list, avoiding duplicates by message_id.

    Args:
        left (list[Message]): Existing list of messages.
        right (list[Message]): New messages to add.

    Returns:
        list[Message]: Combined list with unique messages.

    Example:
        >>> add_messages([msg1], [msg2, msg1])
        [msg1, msg2]
    """
    left_ids = {msg.message_id for msg in left}
    right = [msg for msg in right if msg.message_id not in left_ids]
    return left + right

append_items

append_items(left, right)

Appends items to a list, avoiding duplicates by item.id.

Parameters:

Name Type Description Default

left

list

Existing list of items (must have .id attribute).

required

right

list

New items to add.

required

Returns:

Name Type Description
list list

Combined list with unique items.

Example

append_items([item1], [item2, item1]) [item1, item2]

Source code in pyagenity/utils/reducers.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def append_items(left: list, right: list) -> list:
    """
    Appends items to a list, avoiding duplicates by item.id.

    Args:
        left (list): Existing list of items (must have .id attribute).
        right (list): New items to add.

    Returns:
        list: Combined list with unique items.

    Example:
        >>> append_items([item1], [item2, item1])
        [item1, item2]
    """
    left_ids = {item.id for item in left}
    right = [item for item in right if item.id not in left_ids]
    return left + right

call_sync_or_async async

call_sync_or_async(func, *args, **kwargs)

Call a function that may be sync or async, returning its result.

If the function is synchronous, it runs in a thread pool to avoid blocking the event loop. If the result is awaitable, it is awaited before returning.

Parameters:

Name Type Description Default

func

Callable[..., Any]

The function to call.

required

*args

Positional arguments for the function.

()

**kwargs

Keyword arguments for the function.

{}

Returns:

Name Type Description
Any Any

The result of the function call, awaited if necessary.

Source code in pyagenity/utils/callable_utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def call_sync_or_async(func: Callable[..., Any], *args, **kwargs) -> Any:
    """
    Call a function that may be sync or async, returning its result.

    If the function is synchronous, it runs in a thread pool to avoid blocking
    the event loop. If the result is awaitable, it is awaited before returning.

    Args:
        func (Callable[..., Any]): The function to call.
        *args: Positional arguments for the function.
        **kwargs: Keyword arguments for the function.

    Returns:
        Any: The result of the function call, awaited if necessary.
    """
    if _is_async_callable(func):
        return await func(*args, **kwargs)

    # Call sync function in a thread pool
    result = await asyncio.to_thread(func, *args, **kwargs)
    # If the result is awaitable, await it
    if inspect.isawaitable(result):
        return await result
    return result

convert_messages

convert_messages(system_prompts, state=None, extra_messages=None)

Convert system prompts, agent state, and extra messages to a list of dicts for LLM/tool payloads.

Parameters:

Name Type Description Default

system_prompts

list[dict[str, Any]]

List of system prompt dicts.

required

state

AgentState | None

Optional agent state containing context and summary.

None

extra_messages

list[Message] | None

Optional extra messages to include.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of message dicts for payloads.

Raises:

Type Description
ValueError

If system_prompts is None.

Source code in pyagenity/utils/converter.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def convert_messages(
    system_prompts: list[dict[str, Any]],
    state: Union["AgentState", None] = None,
    extra_messages: list[Message] | None = None,
) -> list[dict[str, Any]]:
    """
    Convert system prompts, agent state, and extra messages to a list of dicts for
    LLM/tool payloads.

    Args:
        system_prompts (list[dict[str, Any]]): List of system prompt dicts.
        state (AgentState | None): Optional agent state containing context and summary.
        extra_messages (list[Message] | None): Optional extra messages to include.

    Returns:
        list[dict[str, Any]]: List of message dicts for payloads.

    Raises:
        ValueError: If system_prompts is None.
    """
    if system_prompts is None:
        logger.error("System prompts are None")
        raise ValueError("System prompts cannot be None")

    res = []
    res += system_prompts

    if state and state.context_summary:
        summary = {
            "role": "assistant",
            "content": state.context_summary if state.context_summary else "",
        }
        res.append(summary)

    if state and state.context:
        for msg in state.context:
            res.append(_convert_dict(msg))

    if extra_messages:
        for msg in extra_messages:
            res.append(_convert_dict(msg))

    logger.debug("Number of Converted messages: %s", len(res))
    return res

register_after_invoke

register_after_invoke(invocation_type, callback)

Register an after_invoke callback on the global callback manager.

Parameters:

Name Type Description Default

invocation_type

InvocationType

The type of invocation (AI, TOOL, MCP).

required

callback

AfterInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
353
354
355
356
357
358
359
360
361
362
363
def register_after_invoke(
    invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_after_invoke(invocation_type, callback)

register_before_invoke

register_before_invoke(invocation_type, callback)

Register a before_invoke callback on the global callback manager.

Parameters:

Name Type Description Default

invocation_type

InvocationType

The type of invocation (AI, TOOL, MCP).

required

callback

BeforeInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
340
341
342
343
344
345
346
347
348
349
350
def register_before_invoke(
    invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_before_invoke(invocation_type, callback)

register_on_error

register_on_error(invocation_type, callback)

Register an on_error callback on the global callback manager.

Parameters:

Name Type Description Default

invocation_type

InvocationType

The type of invocation (AI, TOOL, MCP).

required

callback

OnErrorCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
366
367
368
369
370
371
372
373
374
def register_on_error(invocation_type: InvocationType, callback: OnErrorCallbackType) -> None:
    """
    Register an on_error callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    default_callback_manager.register_on_error(invocation_type, callback)

replace_messages

replace_messages(left, right)

Replaces the entire message list with a new one.

Parameters:

Name Type Description Default

left

list[Message]

Existing list of messages (ignored).

required

right

list[Message]

New list of messages.

required

Returns:

Type Description
list[Message]

list[Message]: The new message list.

Example

replace_messages([msg1], [msg2]) [msg2]

Source code in pyagenity/utils/reducers.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def replace_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Replaces the entire message list with a new one.

    Args:
        left (list[Message]): Existing list of messages (ignored).
        right (list[Message]): New list of messages.

    Returns:
        list[Message]: The new message list.

    Example:
        >>> replace_messages([msg1], [msg2])
        [msg2]
    """
    return right

replace_value

replace_value(left, right)

Replaces a value with another.

Parameters:

Name Type Description Default

left

Existing value (ignored).

required

right

New value to use.

required

Returns:

Name Type Description
Any

The new value.

Example

replace_value(1, 2) 2

Source code in pyagenity/utils/reducers.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def replace_value(left, right):
    """
    Replaces a value with another.

    Args:
        left: Existing value (ignored).
        right: New value to use.

    Returns:
        Any: The new value.

    Example:
        >>> replace_value(1, 2)
        2
    """
    return right

run_coroutine

run_coroutine(func)

Run an async coroutine from a sync context safely.

Source code in pyagenity/utils/callable_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
def run_coroutine(func: Coroutine) -> Any:
    """Run an async coroutine from a sync context safely."""
    # Always try to get/create an event loop and use thread-safe execution
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No loop running, create one
        return asyncio.run(func)

    # Loop is running, use thread-safe execution
    fut = asyncio.run_coroutine_threadsafe(func, loop)
    return fut.result()

Modules

background_task_manager

Background task manager for async operations in PyAgenity.

This module provides BackgroundTaskManager, which tracks and manages asyncio background tasks, ensuring proper cleanup and error logging.

Classes:

Name Description
BackgroundTaskManager

Manages asyncio background tasks for agent operations.

TaskMetadata

Metadata for tracking background tasks.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

BackgroundTaskManager

Manages asyncio background tasks for agent operations.

Tracks created tasks, ensures cleanup, and logs errors from background execution. Enhanced with cancellation, timeouts, and metadata tracking.

Methods:

Name Description
__init__

Initialize the BackgroundTaskManager.

cancel_all

Cancel all tracked background tasks.

create_task

Create and track a background asyncio task.

get_task_count

Get the number of active background tasks.

get_task_info

Get information about all active tasks.

wait_for_all

Wait for all tracked background tasks to complete.

Source code in pyagenity/utils/background_task_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class BackgroundTaskManager:
    """
    Manages asyncio background tasks for agent operations.

    Tracks created tasks, ensures cleanup, and logs errors from background execution.
    Enhanced with cancellation, timeouts, and metadata tracking.
    """

    def __init__(self):
        """
        Initialize the BackgroundTaskManager.
        """
        self._tasks: set[asyncio.Task] = set()
        self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}

    def create_task(
        self,
        coro: Coroutine,
        *,
        name: str = "background_task",
        timeout: float | None = None,
        context: dict[str, Any] | None = None,
    ) -> asyncio.Task:
        """
        Create and track a background asyncio task.

        Args:
            coro (Coroutine): The coroutine to run in the background.
            name (str): Human-readable name for the task.
            timeout (Optional[float]): Timeout in seconds for the task.
            context (Optional[dict]): Additional context for logging.

        Returns:
            asyncio.Task: The created task.
        """
        metrics.counter("background_task_manager.tasks_created").inc()

        task = asyncio.create_task(coro, name=name)
        metadata = TaskMetadata(
            name=name, created_at=time.time(), timeout=timeout, context=context or {}
        )

        self._tasks.add(task)
        self._task_metadata[task] = metadata
        task.add_done_callback(self._task_done_callback)

        # Set up timeout if specified
        if timeout:
            self._setup_timeout(task, timeout)

        logger.debug(
            "Created background task: %s (timeout=%s)",
            name,
            timeout,
            extra={"task_context": context},
        )

        return task

    def _setup_timeout(self, task: asyncio.Task, timeout: float) -> None:
        """Set up timeout cancellation for a task."""

        async def timeout_canceller():
            try:
                await asyncio.sleep(timeout)
                if not task.done():
                    metadata = self._task_metadata.get(task)
                    task_name = metadata.name if metadata else "unknown"
                    logger.warning(
                        "Background task '%s' timed out after %s seconds", task_name, timeout
                    )
                    task.cancel()
                    metrics.counter("background_task_manager.tasks_timed_out").inc()
            except asyncio.CancelledError:
                pass  # Parent task was cancelled, this is expected

        # Create the timeout task but don't track it (avoid recursive tracking)
        timeout_task = asyncio.create_task(timeout_canceller())
        # Add a callback to clean up the timeout task reference
        timeout_task.add_done_callback(lambda t: None)

    def _task_done_callback(self, task: asyncio.Task) -> None:
        """
        Remove completed task and log exceptions if any.

        Args:
            task (asyncio.Task): The completed asyncio task.
        """
        metadata = self._task_metadata.pop(task, None)
        self._tasks.discard(task)

        task_name = metadata.name if metadata else "unknown"
        duration = time.time() - metadata.created_at if metadata else 0.0

        try:
            task.result()  # raises if task failed
            metrics.counter("background_task_manager.tasks_completed").inc()
            logger.debug(
                "Background task '%s' completed successfully (duration=%.2fs)",
                task_name,
                duration,
                extra={"task_context": metadata.context if metadata else {}},
            )
        except asyncio.CancelledError:
            metrics.counter("background_task_manager.tasks_cancelled").inc()
            logger.debug("Background task '%s' was cancelled", task_name)
        except Exception as e:
            metrics.counter("background_task_manager.tasks_failed").inc()
            error_msg = (
                f"Background task raised an exception - {task_name}: {e} (duration={duration:.2f}s)"
            )
            logger.error(
                error_msg,
                exc_info=e,
                extra={"task_context": metadata.context if metadata else {}},
            )

    async def cancel_all(self) -> None:
        """
        Cancel all tracked background tasks.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Cancelling %d background tasks...", len(self._tasks))

        for task in self._tasks.copy():
            if not task.done():
                task.cancel()

        # Wait a short time for cancellations to process
        await asyncio.sleep(0.1)

    async def wait_for_all(
        self, timeout: float | None = None, return_exceptions: bool = False
    ) -> None:
        """
        Wait for all tracked background tasks to complete.

        Args:
            timeout (float | None): Maximum time to wait in seconds.
            return_exceptions (bool): If True, exceptions are returned as results instead of raised.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

        try:
            if timeout:
                await asyncio.wait_for(
                    asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                    timeout=timeout,
                )
            else:
                await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
            logger.info("All background tasks finished.")
        except TimeoutError:
            logger.warning("Timeout waiting for background tasks, some may still be running")
            metrics.counter("background_task_manager.wait_timeout").inc()

    def get_task_count(self) -> int:
        """Get the number of active background tasks."""
        return len(self._tasks)

    def get_task_info(self) -> list[dict[str, Any]]:
        """Get information about all active tasks."""
        current_time = time.time()
        return [
            {
                "name": metadata.name,
                "age_seconds": current_time - metadata.created_at,
                "timeout": metadata.timeout,
                "context": metadata.context,
                "done": task.done(),
                "cancelled": task.cancelled() if task.done() else False,
            }
            for task, metadata in self._task_metadata.items()
        ]
Functions
__init__
__init__()

Initialize the BackgroundTaskManager.

Source code in pyagenity/utils/background_task_manager.py
39
40
41
42
43
44
def __init__(self):
    """
    Initialize the BackgroundTaskManager.
    """
    self._tasks: set[asyncio.Task] = set()
    self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}
cancel_all async
cancel_all()

Cancel all tracked background tasks.

Returns:

Type Description
None

None

Source code in pyagenity/utils/background_task_manager.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def cancel_all(self) -> None:
    """
    Cancel all tracked background tasks.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Cancelling %d background tasks...", len(self._tasks))

    for task in self._tasks.copy():
        if not task.done():
            task.cancel()

    # Wait a short time for cancellations to process
    await asyncio.sleep(0.1)
create_task
create_task(coro, *, name='background_task', timeout=None, context=None)

Create and track a background asyncio task.

Parameters:

Name Type Description Default
coro Coroutine

The coroutine to run in the background.

required
name str

Human-readable name for the task.

'background_task'
timeout Optional[float]

Timeout in seconds for the task.

None
context Optional[dict]

Additional context for logging.

None

Returns:

Type Description
Task

asyncio.Task: The created task.

Source code in pyagenity/utils/background_task_manager.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def create_task(
    self,
    coro: Coroutine,
    *,
    name: str = "background_task",
    timeout: float | None = None,
    context: dict[str, Any] | None = None,
) -> asyncio.Task:
    """
    Create and track a background asyncio task.

    Args:
        coro (Coroutine): The coroutine to run in the background.
        name (str): Human-readable name for the task.
        timeout (Optional[float]): Timeout in seconds for the task.
        context (Optional[dict]): Additional context for logging.

    Returns:
        asyncio.Task: The created task.
    """
    metrics.counter("background_task_manager.tasks_created").inc()

    task = asyncio.create_task(coro, name=name)
    metadata = TaskMetadata(
        name=name, created_at=time.time(), timeout=timeout, context=context or {}
    )

    self._tasks.add(task)
    self._task_metadata[task] = metadata
    task.add_done_callback(self._task_done_callback)

    # Set up timeout if specified
    if timeout:
        self._setup_timeout(task, timeout)

    logger.debug(
        "Created background task: %s (timeout=%s)",
        name,
        timeout,
        extra={"task_context": context},
    )

    return task
get_task_count
get_task_count()

Get the number of active background tasks.

Source code in pyagenity/utils/background_task_manager.py
198
199
200
def get_task_count(self) -> int:
    """Get the number of active background tasks."""
    return len(self._tasks)
get_task_info
get_task_info()

Get information about all active tasks.

Source code in pyagenity/utils/background_task_manager.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_task_info(self) -> list[dict[str, Any]]:
    """Get information about all active tasks."""
    current_time = time.time()
    return [
        {
            "name": metadata.name,
            "age_seconds": current_time - metadata.created_at,
            "timeout": metadata.timeout,
            "context": metadata.context,
            "done": task.done(),
            "cancelled": task.cancelled() if task.done() else False,
        }
        for task, metadata in self._task_metadata.items()
    ]
wait_for_all async
wait_for_all(timeout=None, return_exceptions=False)

Wait for all tracked background tasks to complete.

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait in seconds.

None
return_exceptions bool

If True, exceptions are returned as results instead of raised.

False

Returns:

Type Description
None

None

Source code in pyagenity/utils/background_task_manager.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def wait_for_all(
    self, timeout: float | None = None, return_exceptions: bool = False
) -> None:
    """
    Wait for all tracked background tasks to complete.

    Args:
        timeout (float | None): Maximum time to wait in seconds.
        return_exceptions (bool): If True, exceptions are returned as results instead of raised.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

    try:
        if timeout:
            await asyncio.wait_for(
                asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                timeout=timeout,
            )
        else:
            await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
        logger.info("All background tasks finished.")
    except TimeoutError:
        logger.warning("Timeout waiting for background tasks, some may still be running")
        metrics.counter("background_task_manager.wait_timeout").inc()
TaskMetadata dataclass

Metadata for tracking background tasks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
context dict[str, Any] | None
created_at float
name str
timeout float | None
Source code in pyagenity/utils/background_task_manager.py
21
22
23
24
25
26
27
28
@dataclass
class TaskMetadata:
    """Metadata for tracking background tasks."""

    name: str
    created_at: float
    timeout: float | None = None
    context: dict[str, Any] | None = None
Attributes
context class-attribute instance-attribute
context = None
created_at instance-attribute
created_at
name instance-attribute
name
timeout class-attribute instance-attribute
timeout = None
Functions
__init__
__init__(name, created_at, timeout=None, context=None)

Modules

callable_utils

Utilities for calling sync or async functions in PyAgenity.

This module provides helpers to detect async callables and to invoke functions that may be synchronous or asynchronous, handling thread pool execution and awaitables.

Functions:

Name Description
call_sync_or_async

Call a function that may be sync or async, returning its result.

run_coroutine

Run an async coroutine from a sync context safely.

Functions

call_sync_or_async async
call_sync_or_async(func, *args, **kwargs)

Call a function that may be sync or async, returning its result.

If the function is synchronous, it runs in a thread pool to avoid blocking the event loop. If the result is awaitable, it is awaited before returning.

Parameters:

Name Type Description Default
func
Callable[..., Any]

The function to call.

required
*args

Positional arguments for the function.

()
**kwargs

Keyword arguments for the function.

{}

Returns:

Name Type Description
Any Any

The result of the function call, awaited if necessary.

Source code in pyagenity/utils/callable_utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def call_sync_or_async(func: Callable[..., Any], *args, **kwargs) -> Any:
    """
    Call a function that may be sync or async, returning its result.

    If the function is synchronous, it runs in a thread pool to avoid blocking
    the event loop. If the result is awaitable, it is awaited before returning.

    Args:
        func (Callable[..., Any]): The function to call.
        *args: Positional arguments for the function.
        **kwargs: Keyword arguments for the function.

    Returns:
        Any: The result of the function call, awaited if necessary.
    """
    if _is_async_callable(func):
        return await func(*args, **kwargs)

    # Call sync function in a thread pool
    result = await asyncio.to_thread(func, *args, **kwargs)
    # If the result is awaitable, await it
    if inspect.isawaitable(result):
        return await result
    return result
run_coroutine
run_coroutine(func)

Run an async coroutine from a sync context safely.

Source code in pyagenity/utils/callable_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
def run_coroutine(func: Coroutine) -> Any:
    """Run an async coroutine from a sync context safely."""
    # Always try to get/create an event loop and use thread-safe execution
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No loop running, create one
        return asyncio.run(func)

    # Loop is running, use thread-safe execution
    fut = asyncio.run_coroutine_threadsafe(func, loop)
    return fut.result()

callbacks

Callback system for PyAgenity.

This module provides a comprehensive callback framework that allows users to define their own validation logic and custom behavior at key points in the execution flow:

  • before_invoke: Called before AI/TOOL/MCP invocation for input validation and modification
  • after_invoke: Called after AI/TOOL/MCP invocation for output validation and modification
  • on_error: Called when errors occur during invocation for error handling and logging

The system is generic and type-safe, supporting different callback types for different invocation contexts.

Classes:

Name Description
AfterInvokeCallback

Abstract base class for after_invoke callbacks.

BeforeInvokeCallback

Abstract base class for before_invoke callbacks.

CallbackContext

Context information passed to callbacks.

CallbackManager

Manages registration and execution of callbacks for different invocation types.

InvocationType

Types of invocations that can trigger callbacks.

OnErrorCallback

Abstract base class for on_error callbacks.

Functions:

Name Description
register_after_invoke

Register an after_invoke callback on the global callback manager.

register_before_invoke

Register a before_invoke callback on the global callback manager.

register_on_error

Register an on_error callback on the global callback manager.

Attributes:

Name Type Description
AfterInvokeCallbackType
BeforeInvokeCallbackType
OnErrorCallbackType
default_callback_manager
logger

Attributes

AfterInvokeCallbackType module-attribute
AfterInvokeCallbackType = Union[AfterInvokeCallback[Any, Any], Callable[[CallbackContext, Any, Any], Union[Any, Awaitable[Any]]]]
BeforeInvokeCallbackType module-attribute
BeforeInvokeCallbackType = Union[BeforeInvokeCallback[Any, Any], Callable[[CallbackContext, Any], Union[Any, Awaitable[Any]]]]
OnErrorCallbackType module-attribute
OnErrorCallbackType = Union[OnErrorCallback, Callable[[CallbackContext, Any, Exception], Union[Any | None, Awaitable[Any | None]]]]
default_callback_manager module-attribute
default_callback_manager = CallbackManager()
logger module-attribute
logger = getLogger(__name__)

Classes

AfterInvokeCallback

Bases: ABC

Abstract base class for after_invoke callbacks.

Called after the AI model, tool, or MCP function is invoked. Allows for output validation and modification.

Methods:

Name Description
__call__

Execute the after_invoke callback.

Source code in pyagenity/utils/callbacks.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class AfterInvokeCallback[T, R](ABC):
    """Abstract base class for after_invoke callbacks.

    Called after the AI model, tool, or MCP function is invoked.
    Allows for output validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
        """Execute the after_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The original input data that was sent
            output_data: The output data returned from the invocation

        Returns:
            Modified output data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...
Functions
__call__ abstractmethod async
__call__(context, input_data, output_data)

Execute the after_invoke callback.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation

required
input_data T

The original input data that was sent

required
output_data Any

The output data returned from the invocation

required

Returns:

Type Description
Any | R

Modified output data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in pyagenity/utils/callbacks.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
    """Execute the after_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The original input data that was sent
        output_data: The output data returned from the invocation

    Returns:
        Modified output data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...
BeforeInvokeCallback

Bases: ABC

Abstract base class for before_invoke callbacks.

Called before the AI model, tool, or MCP function is invoked. Allows for input validation and modification.

Methods:

Name Description
__call__

Execute the before_invoke callback.

Source code in pyagenity/utils/callbacks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class BeforeInvokeCallback[T, R](ABC):
    """Abstract base class for before_invoke callbacks.

    Called before the AI model, tool, or MCP function is invoked.
    Allows for input validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
        """Execute the before_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The input data about to be sent to the invocation

        Returns:
            Modified input data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...
Functions
__call__ abstractmethod async
__call__(context, input_data)

Execute the before_invoke callback.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation

required
input_data T

The input data about to be sent to the invocation

required

Returns:

Type Description
T | R

Modified input data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in pyagenity/utils/callbacks.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
    """Execute the before_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The input data about to be sent to the invocation

    Returns:
        Modified input data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...
CallbackContext dataclass

Context information passed to callbacks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
function_name str | None
invocation_type InvocationType
metadata dict[str, Any] | None
node_name str
Source code in pyagenity/utils/callbacks.py
36
37
38
39
40
41
42
43
@dataclass
class CallbackContext:
    """Context information passed to callbacks."""

    invocation_type: InvocationType
    node_name: str
    function_name: str | None = None
    metadata: dict[str, Any] | None = None
Attributes
function_name class-attribute instance-attribute
function_name = None
invocation_type instance-attribute
invocation_type
metadata class-attribute instance-attribute
metadata = None
node_name instance-attribute
node_name
Functions
__init__
__init__(invocation_type, node_name, function_name=None, metadata=None)
CallbackManager

Manages registration and execution of callbacks for different invocation types.

Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.

Methods:

Name Description
__init__

Initialize the CallbackManager with empty callback registries.

clear_callbacks

Clear callbacks for a specific invocation type or all types.

execute_after_invoke

Execute all after_invoke callbacks for the given context.

execute_before_invoke

Execute all before_invoke callbacks for the given context.

execute_on_error

Execute all on_error callbacks for the given context.

get_callback_counts

Get count of registered callbacks by type for debugging.

register_after_invoke

Register an after_invoke callback for a specific invocation type.

register_before_invoke

Register a before_invoke callback for a specific invocation type.

register_on_error

Register an on_error callback for a specific invocation type.

Source code in pyagenity/utils/callbacks.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class CallbackManager:
    """
    Manages registration and execution of callbacks for different invocation types.

    Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.
    """

    def __init__(self):
        """
        Initialize the CallbackManager with empty callback registries.
        """
        self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }

    def register_before_invoke(
        self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
    ) -> None:
        """
        Register a before_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (BeforeInvokeCallbackType): The callback to register.
        """
        self._before_callbacks[invocation_type].append(callback)

    def register_after_invoke(
        self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
    ) -> None:
        """
        Register an after_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (AfterInvokeCallbackType): The callback to register.
        """
        self._after_callbacks[invocation_type].append(callback)

    def register_on_error(
        self, invocation_type: InvocationType, callback: OnErrorCallbackType
    ) -> None:
        """
        Register an on_error callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (OnErrorCallbackType): The callback to register.
        """
        self._error_callbacks[invocation_type].append(callback)

    async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
        """
        Execute all before_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data to be validated or modified.

        Returns:
            Any: The modified input data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_data = input_data

        for callback in self._before_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, BeforeInvokeCallback):
                    current_data = await callback(context, current_data)
                elif callable(callback):
                    result = callback(context, current_data)
                    if hasattr(result, "__await__"):
                        current_data = await result
                    else:
                        current_data = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_data

    async def execute_after_invoke(
        self, context: CallbackContext, input_data: Any, output_data: Any
    ) -> Any:
        """
        Execute all after_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The original input data sent to the invocation.
            output_data (Any): The output data returned from the invocation.

        Returns:
            Any: The modified output data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_output = output_data

        for callback in self._after_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, AfterInvokeCallback):
                    current_output = await callback(context, input_data, current_output)
                elif callable(callback):
                    result = callback(context, input_data, current_output)
                    if hasattr(result, "__await__"):
                        current_output = await result
                    else:
                        current_output = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_output

    async def execute_on_error(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Message | None:
        """
        Execute all on_error callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data that caused the error.
            error (Exception): The exception that occurred.

        Returns:
            Message | None: Recovery value from callbacks, or None if not handled.
        """
        recovery_value = None

        for callback in self._error_callbacks[context.invocation_type]:
            try:
                result = None
                if isinstance(callback, OnErrorCallback):
                    result = await callback(context, input_data, error)
                elif callable(callback):
                    result = callback(context, input_data, error)
                    if hasattr(result, "__await__"):
                        result = await result  # type: ignore

                if isinstance(result, Message) or result is None:
                    recovery_value = result
            except Exception as exc:
                logger.exception("Error callback failed: %s", exc)
                continue

        return recovery_value

    def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
        """
        Clear callbacks for a specific invocation type or all types.

        Args:
            invocation_type (InvocationType | None): The invocation type to clear, or None for all.
        """
        if invocation_type:
            self._before_callbacks[invocation_type].clear()
            self._after_callbacks[invocation_type].clear()
            self._error_callbacks[invocation_type].clear()
        else:
            for inv_type in InvocationType:
                self._before_callbacks[inv_type].clear()
                self._after_callbacks[inv_type].clear()
                self._error_callbacks[inv_type].clear()

    def get_callback_counts(self) -> dict[str, dict[str, int]]:
        """
        Get count of registered callbacks by type for debugging.

        Returns:
            dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
        """
        return {
            inv_type.value: {
                "before_invoke": len(self._before_callbacks[inv_type]),
                "after_invoke": len(self._after_callbacks[inv_type]),
                "on_error": len(self._error_callbacks[inv_type]),
            }
            for inv_type in InvocationType
        }
Functions
__init__
__init__()

Initialize the CallbackManager with empty callback registries.

Source code in pyagenity/utils/callbacks.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(self):
    """
    Initialize the CallbackManager with empty callback registries.
    """
    self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
clear_callbacks
clear_callbacks(invocation_type=None)

Clear callbacks for a specific invocation type or all types.

Parameters:

Name Type Description Default
invocation_type InvocationType | None

The invocation type to clear, or None for all.

None
Source code in pyagenity/utils/callbacks.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
    """
    Clear callbacks for a specific invocation type or all types.

    Args:
        invocation_type (InvocationType | None): The invocation type to clear, or None for all.
    """
    if invocation_type:
        self._before_callbacks[invocation_type].clear()
        self._after_callbacks[invocation_type].clear()
        self._error_callbacks[invocation_type].clear()
    else:
        for inv_type in InvocationType:
            self._before_callbacks[inv_type].clear()
            self._after_callbacks[inv_type].clear()
            self._error_callbacks[inv_type].clear()
execute_after_invoke async
execute_after_invoke(context, input_data, output_data)

Execute all after_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation.

required
input_data Any

The original input data sent to the invocation.

required
output_data Any

The output data returned from the invocation.

required

Returns:

Name Type Description
Any Any

The modified output data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in pyagenity/utils/callbacks.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def execute_after_invoke(
    self, context: CallbackContext, input_data: Any, output_data: Any
) -> Any:
    """
    Execute all after_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The original input data sent to the invocation.
        output_data (Any): The output data returned from the invocation.

    Returns:
        Any: The modified output data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_output = output_data

    for callback in self._after_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, AfterInvokeCallback):
                current_output = await callback(context, input_data, current_output)
            elif callable(callback):
                result = callback(context, input_data, current_output)
                if hasattr(result, "__await__"):
                    current_output = await result
                else:
                    current_output = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_output
execute_before_invoke async
execute_before_invoke(context, input_data)

Execute all before_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation.

required
input_data Any

The input data to be validated or modified.

required

Returns:

Name Type Description
Any Any

The modified input data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in pyagenity/utils/callbacks.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
    """
    Execute all before_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data to be validated or modified.

    Returns:
        Any: The modified input data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_data = input_data

    for callback in self._before_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, BeforeInvokeCallback):
                current_data = await callback(context, current_data)
            elif callable(callback):
                result = callback(context, current_data)
                if hasattr(result, "__await__"):
                    current_data = await result
                else:
                    current_data = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_data
execute_on_error async
execute_on_error(context, input_data, error)

Execute all on_error callbacks for the given context.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation.

required
input_data Any

The input data that caused the error.

required
error Exception

The exception that occurred.

required

Returns:

Type Description
Message | None

Message | None: Recovery value from callbacks, or None if not handled.

Source code in pyagenity/utils/callbacks.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def execute_on_error(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Message | None:
    """
    Execute all on_error callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data that caused the error.
        error (Exception): The exception that occurred.

    Returns:
        Message | None: Recovery value from callbacks, or None if not handled.
    """
    recovery_value = None

    for callback in self._error_callbacks[context.invocation_type]:
        try:
            result = None
            if isinstance(callback, OnErrorCallback):
                result = await callback(context, input_data, error)
            elif callable(callback):
                result = callback(context, input_data, error)
                if hasattr(result, "__await__"):
                    result = await result  # type: ignore

            if isinstance(result, Message) or result is None:
                recovery_value = result
        except Exception as exc:
            logger.exception("Error callback failed: %s", exc)
            continue

    return recovery_value
get_callback_counts
get_callback_counts()

Get count of registered callbacks by type for debugging.

Returns:

Type Description
dict[str, dict[str, int]]

dict[str, dict[str, int]]: Counts of callbacks for each invocation type.

Source code in pyagenity/utils/callbacks.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_callback_counts(self) -> dict[str, dict[str, int]]:
    """
    Get count of registered callbacks by type for debugging.

    Returns:
        dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
    """
    return {
        inv_type.value: {
            "before_invoke": len(self._before_callbacks[inv_type]),
            "after_invoke": len(self._after_callbacks[inv_type]),
            "on_error": len(self._error_callbacks[inv_type]),
        }
        for inv_type in InvocationType
    }
register_after_invoke
register_after_invoke(invocation_type, callback)

Register an after_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback AfterInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
176
177
178
179
180
181
182
183
184
185
186
def register_after_invoke(
    self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    self._after_callbacks[invocation_type].append(callback)
register_before_invoke
register_before_invoke(invocation_type, callback)

Register a before_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback BeforeInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
164
165
166
167
168
169
170
171
172
173
174
def register_before_invoke(
    self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    self._before_callbacks[invocation_type].append(callback)
register_on_error
register_on_error(invocation_type, callback)

Register an on_error callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback OnErrorCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
188
189
190
191
192
193
194
195
196
197
198
def register_on_error(
    self, invocation_type: InvocationType, callback: OnErrorCallbackType
) -> None:
    """
    Register an on_error callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    self._error_callbacks[invocation_type].append(callback)
InvocationType

Bases: Enum

Types of invocations that can trigger callbacks.

Attributes:

Name Type Description
AI
MCP
TOOL
Source code in pyagenity/utils/callbacks.py
28
29
30
31
32
33
class InvocationType(Enum):
    """Types of invocations that can trigger callbacks."""

    AI = "ai"
    TOOL = "tool"
    MCP = "mcp"
Attributes
AI class-attribute instance-attribute
AI = 'ai'
MCP class-attribute instance-attribute
MCP = 'mcp'
TOOL class-attribute instance-attribute
TOOL = 'tool'
OnErrorCallback

Bases: ABC

Abstract base class for on_error callbacks.

Called when an error occurs during invocation. Allows for error handling and logging.

Methods:

Name Description
__call__

Execute the on_error callback.

Source code in pyagenity/utils/callbacks.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class OnErrorCallback(ABC):
    """Abstract base class for on_error callbacks.

    Called when an error occurs during invocation.
    Allows for error handling and logging.
    """

    @abstractmethod
    async def __call__(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Any | None:
        """Execute the on_error callback.

        Args:
            context: Context information about the invocation
            input_data: The input data that caused the error
            error: The exception that occurred

        Returns:
            Optional recovery value or None to re-raise the error

        Raises:
            Exception: If error handling fails or if the error should be re-raised
        """
        ...
Functions
__call__ abstractmethod async
__call__(context, input_data, error)

Execute the on_error callback.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation

required
input_data Any

The input data that caused the error

required
error Exception

The exception that occurred

required

Returns:

Type Description
Any | None

Optional recovery value or None to re-raise the error

Raises:

Type Description
Exception

If error handling fails or if the error should be re-raised

Source code in pyagenity/utils/callbacks.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@abstractmethod
async def __call__(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Any | None:
    """Execute the on_error callback.

    Args:
        context: Context information about the invocation
        input_data: The input data that caused the error
        error: The exception that occurred

    Returns:
        Optional recovery value or None to re-raise the error

    Raises:
        Exception: If error handling fails or if the error should be re-raised
    """
    ...

Functions

register_after_invoke
register_after_invoke(invocation_type, callback)

Register an after_invoke callback on the global callback manager.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
AfterInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
353
354
355
356
357
358
359
360
361
362
363
def register_after_invoke(
    invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_after_invoke(invocation_type, callback)
register_before_invoke
register_before_invoke(invocation_type, callback)

Register a before_invoke callback on the global callback manager.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
BeforeInvokeCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
340
341
342
343
344
345
346
347
348
349
350
def register_before_invoke(
    invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_before_invoke(invocation_type, callback)
register_on_error
register_on_error(invocation_type, callback)

Register an on_error callback on the global callback manager.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
OnErrorCallbackType

The callback to register.

required
Source code in pyagenity/utils/callbacks.py
366
367
368
369
370
371
372
373
374
def register_on_error(invocation_type: InvocationType, callback: OnErrorCallbackType) -> None:
    """
    Register an on_error callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    default_callback_manager.register_on_error(invocation_type, callback)

command

Command API for AgentGraph in PyAgenity.

This module provides the Command class, which allows nodes to combine state updates with control flow, similar to LangGraph's Command API. Nodes can update agent state and direct graph execution to specific nodes or graphs.

Classes:

Name Description
Command

Command object that combines state updates with control flow.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound='AgentState')

Classes

Command

Command object that combines state updates with control flow.

Allows nodes to update agent state and direct graph execution to specific nodes or graphs. Similar to LangGraph's Command API.

Methods:

Name Description
__init__

Initialize a Command object.

__repr__

Return a string representation of the Command object.

Attributes:

Name Type Description
PARENT
goto
graph
state
update
Source code in pyagenity/utils/command.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Command[StateT: AgentState]:
    """
    Command object that combines state updates with control flow.

    Allows nodes to update agent state and direct graph execution to specific nodes or graphs.
    Similar to LangGraph's Command API.
    """

    PARENT = "PARENT"

    def __init__(
        self,
        update: Union["StateT", None, Message, str, "BaseConverter"] = None,
        goto: str | None = None,
        graph: str | None = None,
        state: StateT | None = None,
    ):
        """
        Initialize a Command object.

        Args:
            update (StateT | None | Message | str | BaseConverter): State update to apply.
            goto (str | None): Next node to execute (node name or END).
            graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
            state (StateT | None): Optional agent state to attach.
        """
        self.update = update
        self.goto = goto
        self.graph = graph
        self.state = state

    def __repr__(self) -> str:
        """
        Return a string representation of the Command object.

        Returns:
            str: String representation of the Command.
        """
        return (
            f"Command(update={self.update}, goto={self.goto}, \n"
            f" graph={self.graph}, state={self.state})"
        )
Attributes
PARENT class-attribute instance-attribute
PARENT = 'PARENT'
goto instance-attribute
goto = goto
graph instance-attribute
graph = graph
state instance-attribute
state = state
update instance-attribute
update = update
Functions
__init__
__init__(update=None, goto=None, graph=None, state=None)

Initialize a Command object.

Parameters:

Name Type Description Default
update StateT | None | Message | str | BaseConverter

State update to apply.

None
goto str | None

Next node to execute (node name or END).

None
graph str | None

Which graph to navigate to (None for current, PARENT for parent).

None
state StateT | None

Optional agent state to attach.

None
Source code in pyagenity/utils/command.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    update: Union["StateT", None, Message, str, "BaseConverter"] = None,
    goto: str | None = None,
    graph: str | None = None,
    state: StateT | None = None,
):
    """
    Initialize a Command object.

    Args:
        update (StateT | None | Message | str | BaseConverter): State update to apply.
        goto (str | None): Next node to execute (node name or END).
        graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
        state (StateT | None): Optional agent state to attach.
    """
    self.update = update
    self.goto = goto
    self.graph = graph
    self.state = state
__repr__
__repr__()

Return a string representation of the Command object.

Returns:

Name Type Description
str str

String representation of the Command.

Source code in pyagenity/utils/command.py
54
55
56
57
58
59
60
61
62
63
64
def __repr__(self) -> str:
    """
    Return a string representation of the Command object.

    Returns:
        str: String representation of the Command.
    """
    return (
        f"Command(update={self.update}, goto={self.goto}, \n"
        f" graph={self.graph}, state={self.state})"
    )

constants

Constants and enums for PyAgenity agent graph execution and messaging.

This module defines special node names, message storage levels, execution states, and response granularity options for agent workflows.

Classes:

Name Description
ExecutionState

Graph execution states for agent workflows.

ResponseGranularity

Response granularity options for agent graph outputs.

StorageLevel

Message storage levels for agent state persistence.

Attributes:

Name Type Description
END Literal['__end__']
START Literal['__start__']

Attributes

END module-attribute
END = '__end__'
START module-attribute
START = '__start__'

Classes

ExecutionState

Bases: StrEnum

Graph execution states for agent workflows.

Values

RUNNING: Execution is in progress. PAUSED: Execution is paused. COMPLETED: Execution completed successfully. ERROR: Execution encountered an error. INTERRUPTED: Execution was interrupted. ABORTED: Execution was aborted. IDLE: Execution is idle.

Attributes:

Name Type Description
ABORTED
COMPLETED
ERROR
IDLE
INTERRUPTED
PAUSED
RUNNING
Source code in pyagenity/utils/constants.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ExecutionState(StrEnum):
    """
    Graph execution states for agent workflows.

    Values:
        RUNNING: Execution is in progress.
        PAUSED: Execution is paused.
        COMPLETED: Execution completed successfully.
        ERROR: Execution encountered an error.
        INTERRUPTED: Execution was interrupted.
        ABORTED: Execution was aborted.
        IDLE: Execution is idle.
    """

    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    ERROR = "error"
    INTERRUPTED = "interrupted"
    ABORTED = "aborted"
    IDLE = "idle"
Attributes
ABORTED class-attribute instance-attribute
ABORTED = 'aborted'
COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
IDLE class-attribute instance-attribute
IDLE = 'idle'
INTERRUPTED class-attribute instance-attribute
INTERRUPTED = 'interrupted'
PAUSED class-attribute instance-attribute
PAUSED = 'paused'
RUNNING class-attribute instance-attribute
RUNNING = 'running'
ResponseGranularity

Bases: StrEnum

Response granularity options for agent graph outputs.

Values

FULL: State, latest messages. PARTIAL: Context, summary, latest messages. LOW: Only latest messages.

Attributes:

Name Type Description
FULL
LOW
PARTIAL
Source code in pyagenity/utils/constants.py
55
56
57
58
59
60
61
62
63
64
65
66
67
class ResponseGranularity(StrEnum):
    """
    Response granularity options for agent graph outputs.

    Values:
        FULL: State, latest messages.
        PARTIAL: Context, summary, latest messages.
        LOW: Only latest messages.
    """

    FULL = "full"
    PARTIAL = "partial"
    LOW = "low"
Attributes
FULL class-attribute instance-attribute
FULL = 'full'
LOW class-attribute instance-attribute
LOW = 'low'
PARTIAL class-attribute instance-attribute
PARTIAL = 'partial'
StorageLevel

Message storage levels for agent state persistence.

Attributes:

Name Type Description
ALL

Save everything including tool calls.

MEDIUM

Only AI and human messages.

LOW

Only first human and last AI message.

Source code in pyagenity/utils/constants.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class StorageLevel:
    """
    Message storage levels for agent state persistence.

    Attributes:
        ALL: Save everything including tool calls.
        MEDIUM: Only AI and human messages.
        LOW: Only first human and last AI message.
    """

    ALL = "all"
    MEDIUM = "medium"
    LOW = "low"
Attributes
ALL class-attribute instance-attribute
ALL = 'all'
LOW class-attribute instance-attribute
LOW = 'low'
MEDIUM class-attribute instance-attribute
MEDIUM = 'medium'

converter

Message conversion utilities for PyAgenity agent graphs.

This module provides helpers to convert Message objects and agent state into dicts suitable for LLM and tool invocation payloads.

Functions:

Name Description
convert_messages

Convert system prompts, agent state, and extra messages to a list of dicts for

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

Functions

convert_messages
convert_messages(system_prompts, state=None, extra_messages=None)

Convert system prompts, agent state, and extra messages to a list of dicts for LLM/tool payloads.

Parameters:

Name Type Description Default
system_prompts
list[dict[str, Any]]

List of system prompt dicts.

required
state
AgentState | None

Optional agent state containing context and summary.

None
extra_messages
list[Message] | None

Optional extra messages to include.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of message dicts for payloads.

Raises:

Type Description
ValueError

If system_prompts is None.

Source code in pyagenity/utils/converter.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def convert_messages(
    system_prompts: list[dict[str, Any]],
    state: Union["AgentState", None] = None,
    extra_messages: list[Message] | None = None,
) -> list[dict[str, Any]]:
    """
    Convert system prompts, agent state, and extra messages to a list of dicts for
    LLM/tool payloads.

    Args:
        system_prompts (list[dict[str, Any]]): List of system prompt dicts.
        state (AgentState | None): Optional agent state containing context and summary.
        extra_messages (list[Message] | None): Optional extra messages to include.

    Returns:
        list[dict[str, Any]]: List of message dicts for payloads.

    Raises:
        ValueError: If system_prompts is None.
    """
    if system_prompts is None:
        logger.error("System prompts are None")
        raise ValueError("System prompts cannot be None")

    res = []
    res += system_prompts

    if state and state.context_summary:
        summary = {
            "role": "assistant",
            "content": state.context_summary if state.context_summary else "",
        }
        res.append(summary)

    if state and state.context:
        for msg in state.context:
            res.append(_convert_dict(msg))

    if extra_messages:
        for msg in extra_messages:
            res.append(_convert_dict(msg))

    logger.debug("Number of Converted messages: %s", len(res))
    return res

id_generator

ID Generator Module

This module provides various strategies for generating unique identifiers. Each generator implements the BaseIDGenerator interface and specifies the type and size of IDs it produces.

Classes:

Name Description
AsyncIDGenerator

ID generator that produces UUID version 4 strings asynchronously.

BaseIDGenerator

Abstract base class for ID generation strategies.

BigIntIDGenerator

ID generator that produces big integer IDs based on current time in nanoseconds.

DefaultIDGenerator

Default ID generator that returns empty strings.

HexIDGenerator

ID generator that produces hexadecimal strings.

IDType

Enumeration of supported ID types.

IntIDGenerator

ID generator that produces 32-bit random integers.

ShortIDGenerator

ID generator that produces short alphanumeric strings.

TimestampIDGenerator

ID generator that produces integer IDs based on current time in microseconds.

UUIDGenerator

ID generator that produces UUID version 4 strings.

Classes

AsyncIDGenerator

Bases: BaseIDGenerator

ID generator that produces UUID version 4 strings asynchronously.

UUIDs are 128-bit identifiers that are virtually guaranteed to be unique across space and time. The generated strings are 36 characters long (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx). This generator provides an asynchronous interface for generating UUIDs.

Methods:

Name Description
generate

Asynchronously generate a new UUID4 string.

Attributes:

Name Type Description
id_type IDType

Return the type of ID generated by this generator.

Source code in pyagenity/utils/id_generator.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class AsyncIDGenerator(BaseIDGenerator):
    """
    ID generator that produces UUID version 4 strings asynchronously.

    UUIDs are 128-bit identifiers that are virtually guaranteed to be unique
    across space and time. The generated strings are 36 characters long
    (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
    This generator provides an asynchronous interface for generating UUIDs.
    """

    @property
    def id_type(self) -> IDType:
        """
        Return the type of ID generated by this generator.

        Returns:
            IDType: The type of ID (STRING).
        """
        return IDType.STRING

    async def generate(self) -> str:
        """
        Asynchronously generate a new UUID4 string.

        Returns:
            str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
        """
        # Simulate async operation (e.g., if fetching from an external service)
        return str(uuid.uuid4())
Attributes
id_type property
id_type

Return the type of ID generated by this generator.

Returns:

Name Type Description
IDType IDType

The type of ID (STRING).

Functions
generate async
generate()

Asynchronously generate a new UUID4 string.

Returns:

Name Type Description
str str

A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').

Source code in pyagenity/utils/id_generator.py
226
227
228
229
230
231
232
233
234
async def generate(self) -> str:
    """
    Asynchronously generate a new UUID4 string.

    Returns:
        str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
    """
    # Simulate async operation (e.g., if fetching from an external service)
    return str(uuid.uuid4())
BaseIDGenerator

Bases: ABC

Abstract base class for ID generation strategies.

All ID generators must implement the id_type property and generate method.

Methods:

Name Description
generate

Generate a new unique ID.

Attributes:

Name Type Description
id_type IDType

Return the type of ID generated by this generator.

Source code in pyagenity/utils/id_generator.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BaseIDGenerator(ABC):
    """Abstract base class for ID generation strategies.

    All ID generators must implement the id_type property and generate method.
    """

    @property
    @abstractmethod
    def id_type(self) -> IDType:
        """Return the type of ID generated by this generator.

        Returns:
            IDType: The type of ID (STRING, INTEGER, or BIGINT).
        """
        raise NotImplementedError("id_type method must be implemented")

    @abstractmethod
    def generate(self) -> str | int | Awaitable[str | int]:
        """Generate a new unique ID.

        Returns:
            str | int: A new unique identifier of the appropriate type.
        """
        raise NotImplementedError("generate method must be implemented")
Attributes
id_type abstractmethod property
id_type

Return the type of ID generated by this generator.

Returns:

Name Type Description
IDType IDType

The type of ID (STRING, INTEGER, or BIGINT).

Functions
generate abstractmethod
generate()

Generate a new unique ID.

Returns:

Type Description
str | int | Awaitable[str | int]

str | int: A new unique identifier of the appropriate type.

Source code in pyagenity/utils/id_generator.py
41
42
43
44
45
46
47
48
@abstractmethod
def generate(self) -> str | int | Awaitable[str | int]:
    """Generate a new unique ID.

    Returns:
        str | int: A new unique identifier of the appropriate type.
    """
    raise NotImplementedError("generate method must be implemented")
BigIntIDGenerator

Bases: BaseIDGenerator

ID generator that produces big integer IDs based on current time in nanoseconds.

Generates IDs by multiplying current Unix timestamp by 1e9, resulting in large integers that are sortable by creation time. Typical size is 19-20 digits.

Methods:

Name Description
generate

Generate a new big integer ID based on current nanoseconds.

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class BigIntIDGenerator(BaseIDGenerator):
    """ID generator that produces big integer IDs based on current time in nanoseconds.

    Generates IDs by multiplying current Unix timestamp by 1e9, resulting in
    large integers that are sortable by creation time. Typical size is 19-20 digits.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.BIGINT

    def generate(self) -> int:
        """Generate a new big integer ID based on current nanoseconds.

        Returns:
            int: A large integer (19-20 digits) representing nanoseconds since Unix epoch.
        """
        # Use current time in nanoseconds for higher uniqueness
        return int(time.time() * 1_000_000_000)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new big integer ID based on current nanoseconds.

Returns:

Name Type Description
int int

A large integer (19-20 digits) representing nanoseconds since Unix epoch.

Source code in pyagenity/utils/id_generator.py
83
84
85
86
87
88
89
90
def generate(self) -> int:
    """Generate a new big integer ID based on current nanoseconds.

    Returns:
        int: A large integer (19-20 digits) representing nanoseconds since Unix epoch.
    """
    # Use current time in nanoseconds for higher uniqueness
    return int(time.time() * 1_000_000_000)
DefaultIDGenerator

Bases: BaseIDGenerator

Default ID generator that returns empty strings.

This generator is intended as a placeholder that can be configured to use framework defaults (typically UUID-based). Currently returns empty strings. If empty string is returned, the framework will use its default UUID-based generator. If the framework is not configured to use UUID generation, it will fall back to UUID4.

Methods:

Name Description
generate

Generate a default ID (currently empty string).

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class DefaultIDGenerator(BaseIDGenerator):
    """Default ID generator that returns empty strings.

    This generator is intended as a placeholder that can be configured
    to use framework defaults (typically UUID-based). Currently returns
    empty strings. If empty string is returned, the framework will use its default
    UUID-based generator. If the framework is not configured to use
    UUID generation, it will fall back to UUID4.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a default ID (currently empty string).

        If empty string is returned, the framework will use its default
        UUID-based generator. If the framework is not configured to use
        UUID generation, it will fall back to UUID4.

        Returns:
            str: An empty string (framework will substitute with UUID).
        """
        # if you keep empty, then it will be used default
        # framework default which is UUID based
        # if framework not using then uuid 4 will be used
        return ""
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a default ID (currently empty string).

If empty string is returned, the framework will use its default UUID-based generator. If the framework is not configured to use UUID generation, it will fall back to UUID4.

Returns:

Name Type Description
str str

An empty string (framework will substitute with UUID).

Source code in pyagenity/utils/id_generator.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def generate(self) -> str:
    """Generate a default ID (currently empty string).

    If empty string is returned, the framework will use its default
    UUID-based generator. If the framework is not configured to use
    UUID generation, it will fall back to UUID4.

    Returns:
        str: An empty string (framework will substitute with UUID).
    """
    # if you keep empty, then it will be used default
    # framework default which is UUID based
    # if framework not using then uuid 4 will be used
    return ""
HexIDGenerator

Bases: BaseIDGenerator

ID generator that produces hexadecimal strings.

Generates cryptographically secure random hex strings of 32 characters (representing 16 random bytes). Each character is a hexadecimal digit (0-9, a-f).

Methods:

Name Description
generate

Generate a new 32-character hexadecimal string.

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class HexIDGenerator(BaseIDGenerator):
    """ID generator that produces hexadecimal strings.

    Generates cryptographically secure random hex strings of 32 characters
    (representing 16 random bytes). Each character is a hexadecimal digit (0-9, a-f).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new 32-character hexadecimal string.

        Returns:
            str: A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').
        """
        return secrets.token_hex(16)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new 32-character hexadecimal string.

Returns:

Name Type Description
str str

A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').

Source code in pyagenity/utils/id_generator.py
154
155
156
157
158
159
160
def generate(self) -> str:
    """Generate a new 32-character hexadecimal string.

    Returns:
        str: A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').
    """
    return secrets.token_hex(16)
IDType

Bases: StrEnum

Enumeration of supported ID types.

Attributes:

Name Type Description
BIGINT
INTEGER
STRING
Source code in pyagenity/utils/id_generator.py
17
18
19
20
21
22
class IDType(enum.StrEnum):
    """Enumeration of supported ID types."""

    STRING = "string"  # String-based IDs
    INTEGER = "integer"  # Integer-based IDs
    BIGINT = "bigint"  # Big integer IDs
Attributes
BIGINT class-attribute instance-attribute
BIGINT = 'bigint'
INTEGER class-attribute instance-attribute
INTEGER = 'integer'
STRING class-attribute instance-attribute
STRING = 'string'
IntIDGenerator

Bases: BaseIDGenerator

ID generator that produces 32-bit random integers.

Generates cryptographically secure random integers using secrets.randbits(32). Values range from 0 to 2^32 - 1 (4,294,967,295).

Methods:

Name Description
generate

Generate a new 32-bit random integer.

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class IntIDGenerator(BaseIDGenerator):
    """ID generator that produces 32-bit random integers.

    Generates cryptographically secure random integers using secrets.randbits(32).
    Values range from 0 to 2^32 - 1 (4,294,967,295).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.INTEGER

    def generate(self) -> int:
        """Generate a new 32-bit random integer.

        Returns:
            int: A random integer between 0 and 4,294,967,295 (inclusive).
        """
        return secrets.randbits(32)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new 32-bit random integer.

Returns:

Name Type Description
int int

A random integer between 0 and 4,294,967,295 (inclusive).

Source code in pyagenity/utils/id_generator.py
134
135
136
137
138
139
140
def generate(self) -> int:
    """Generate a new 32-bit random integer.

    Returns:
        int: A random integer between 0 and 4,294,967,295 (inclusive).
    """
    return secrets.randbits(32)
ShortIDGenerator

Bases: BaseIDGenerator

ID generator that produces short alphanumeric strings.

Generates 8-character strings using uppercase/lowercase letters and digits. Each character is randomly chosen from 62 possible characters (26 + 26 + 10). Total possible combinations: 62^8 ≈ 2.18 x 10^14.

Methods:

Name Description
generate

Generate a new 8-character alphanumeric string.

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ShortIDGenerator(BaseIDGenerator):
    """ID generator that produces short alphanumeric strings.

    Generates 8-character strings using uppercase/lowercase letters and digits.
    Each character is randomly chosen from 62 possible characters (26 + 26 + 10).
    Total possible combinations: 62^8 ≈ 2.18 x 10^14.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new 8-character alphanumeric string.

        Returns:
            str: An 8-character string containing letters and digits
                 (e.g., 'Ab3XyZ9k').
        """
        alphabet = string.ascii_letters + string.digits
        return "".join(secrets.choice(alphabet) for _ in range(8))
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new 8-character alphanumeric string.

Returns:

Name Type Description
str str

An 8-character string containing letters and digits (e.g., 'Ab3XyZ9k').

Source code in pyagenity/utils/id_generator.py
195
196
197
198
199
200
201
202
203
def generate(self) -> str:
    """Generate a new 8-character alphanumeric string.

    Returns:
        str: An 8-character string containing letters and digits
             (e.g., 'Ab3XyZ9k').
    """
    alphabet = string.ascii_letters + string.digits
    return "".join(secrets.choice(alphabet) for _ in range(8))
TimestampIDGenerator

Bases: BaseIDGenerator

ID generator that produces integer IDs based on current time in microseconds.

Generates IDs by multiplying current Unix timestamp by 1e6, resulting in integers that are sortable by creation time. Typical size is 16-17 digits.

Methods:

Name Description
generate

Generate a new integer ID based on current microseconds.

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class TimestampIDGenerator(BaseIDGenerator):
    """ID generator that produces integer IDs based on current time in microseconds.

    Generates IDs by multiplying current Unix timestamp by 1e6, resulting in
    integers that are sortable by creation time. Typical size is 16-17 digits.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.INTEGER

    def generate(self) -> int:
        """Generate a new integer ID based on current microseconds.

        Returns:
            int: An integer (16-17 digits) representing microseconds since Unix epoch.
        """
        return int(time.time() * 1000000)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new integer ID based on current microseconds.

Returns:

Name Type Description
int int

An integer (16-17 digits) representing microseconds since Unix epoch.

Source code in pyagenity/utils/id_generator.py
174
175
176
177
178
179
180
def generate(self) -> int:
    """Generate a new integer ID based on current microseconds.

    Returns:
        int: An integer (16-17 digits) representing microseconds since Unix epoch.
    """
    return int(time.time() * 1000000)
UUIDGenerator

Bases: BaseIDGenerator

ID generator that produces UUID version 4 strings.

UUIDs are 128-bit identifiers that are virtually guaranteed to be unique across space and time. The generated strings are 36 characters long (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).

Methods:

Name Description
generate

Generate a new UUID4 string.

Attributes:

Name Type Description
id_type IDType
Source code in pyagenity/utils/id_generator.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class UUIDGenerator(BaseIDGenerator):
    """ID generator that produces UUID version 4 strings.

    UUIDs are 128-bit identifiers that are virtually guaranteed to be unique
    across space and time. The generated strings are 36 characters long
    (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new UUID4 string.

        Returns:
            str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
        """
        return str(uuid.uuid4())
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new UUID4 string.

Returns:

Name Type Description
str str

A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').

Source code in pyagenity/utils/id_generator.py
63
64
65
66
67
68
69
def generate(self) -> str:
    """Generate a new UUID4 string.

    Returns:
        str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
    """
    return str(uuid.uuid4())

logging

Centralized logging configuration for PyAgenity.

This module provides logging configuration that can be imported and used throughout the project. Each module should use:

import logging
logger = logging.getLogger(__name__)

This ensures proper hierarchical logging with module-specific loggers.

Typical usage example

from pyagenity.utils.logging import configure_logging configure_logging(level=logging.DEBUG)

Functions:

Name Description
configure_logging

Configures the root logger for the PyAgenity project.

Functions

configure_logging
configure_logging(level=logging.INFO, format_string=None, handler=None)

Configures the root logger for the PyAgenity project.

This function sets up logging for all modules under the 'pyagenity' namespace. It ensures that logs are formatted consistently and sent to the appropriate handler.

Parameters:

Name Type Description Default
level
int

Logging level (e.g., logging.INFO, logging.DEBUG). Defaults to logging.INFO.

INFO
format_string
str

Custom format string for log messages. If None, uses a default format: "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s".

None
handler
Handler

Custom logging handler. If None, uses StreamHandler to stdout.

None

Returns:

Type Description
None

None

Example

configure_logging(level=logging.DEBUG) logger = logging.getLogger("pyagenity.module") logger.info("This is an info message.")

Source code in pyagenity/utils/logging.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def configure_logging(
    level: int = logging.INFO,
    format_string: str | None = None,
    handler: logging.Handler | None = None,
) -> None:
    """
    Configures the root logger for the PyAgenity project.

    This function sets up logging for all modules under the 'pyagenity' namespace.
    It ensures that logs are formatted consistently and sent to the appropriate handler.

    Args:
        level (int, optional): Logging level (e.g., logging.INFO, logging.DEBUG).
            Defaults to logging.INFO.
        format_string (str, optional): Custom format string for log messages.
            If None, uses a default format: "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s".
        handler (logging.Handler, optional): Custom logging handler. If None,
            uses StreamHandler to stdout.

    Returns:
        None

    Raises:
        None

    Example:
        >>> configure_logging(level=logging.DEBUG)
        >>> logger = logging.getLogger("pyagenity.module")
        >>> logger.info("This is an info message.")
    """
    if format_string is None:
        format_string = "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s"

    if handler is None:
        handler = logging.StreamHandler(sys.stdout)

    formatter = logging.Formatter(format_string)
    handler.setFormatter(formatter)

    # Configure root logger for pyagenity
    root_logger = logging.getLogger("pyagenity")
    root_logger.setLevel(level)

    # Only add handler if none exists to avoid duplicates
    if not root_logger.handlers:
        root_logger.addHandler(handler)

    # Prevent propagation to avoid duplicate logs
    root_logger.propagate = False

message

Message and content block primitives for agent graphs.

This module defines the core message representation, multimodal content blocks, token usage tracking, and utility functions for agent graph communication.

Classes:

Name Description
TokenUsages

Tracks token usage statistics for a message or model response.

MediaRef

Reference to media content (image/audio/video/document/data).

AnnotationRef

Reference to annotation metadata.

Message

Represents a message in a conversation, including content, role, metadata, and token usage.

Functions:

Name Description
generate_id

Generates a message or tool call ID based on DI context and type.

Attributes:

Name Type Description
ContentBlock
logger

Attributes

ContentBlock module-attribute
ContentBlock = Annotated[Union[TextBlock, ImageBlock, AudioBlock, VideoBlock, DocumentBlock, DataBlock, ToolCallBlock, ToolResultBlock, ReasoningBlock, AnnotationBlock, ErrorBlock], Field(discriminator='type')]
logger module-attribute
logger = getLogger(__name__)

Classes

AnnotationBlock

Bases: BaseModel

Annotation content block for messages.

Attributes:

Name Type Description
type Literal['annotation']

Block type discriminator.

kind Literal['citation', 'note']

Kind of annotation.

refs list[AnnotationRef]

List of annotation references.

spans list[tuple[int, int]] | None

Spans covered by the annotation.

Source code in pyagenity/utils/message.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class AnnotationBlock(BaseModel):
    """
    Annotation content block for messages.

    Attributes:
        type (Literal["annotation"]): Block type discriminator.
        kind (Literal["citation", "note"]): Kind of annotation.
        refs (list[AnnotationRef]): List of annotation references.
        spans (list[tuple[int, int]] | None): Spans covered by the annotation.
    """

    type: Literal["annotation"] = "annotation"
    kind: Literal["citation", "note"] = "citation"
    refs: list[AnnotationRef] = Field(default_factory=list)
    spans: list[tuple[int, int]] | None = None
Attributes
kind class-attribute instance-attribute
kind = 'citation'
refs class-attribute instance-attribute
refs = Field(default_factory=list)
spans class-attribute instance-attribute
spans = None
type class-attribute instance-attribute
type = 'annotation'
AnnotationRef

Bases: BaseModel

Reference to annotation metadata (e.g., citation, note).

Attributes:

Name Type Description
url str | None

URL to annotation source.

file_id str | None

Provider-managed file ID.

page int | None

Page number (if applicable).

index int | None

Index within the annotation source.

title str | None

Title of the annotation.

Source code in pyagenity/utils/message.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class AnnotationRef(BaseModel):
    """
    Reference to annotation metadata (e.g., citation, note).

    Attributes:
        url (str | None): URL to annotation source.
        file_id (str | None): Provider-managed file ID.
        page (int | None): Page number (if applicable).
        index (int | None): Index within the annotation source.
        title (str | None): Title of the annotation.
    """

    url: str | None = None
    file_id: str | None = None
    page: int | None = None
    index: int | None = None
    title: str | None = None
Attributes
file_id class-attribute instance-attribute
file_id = None
index class-attribute instance-attribute
index = None
page class-attribute instance-attribute
page = None
title class-attribute instance-attribute
title = None
url class-attribute instance-attribute
url = None
AudioBlock

Bases: BaseModel

Audio content block for messages.

Attributes:

Name Type Description
type Literal['audio']

Block type discriminator.

media MediaRef

Reference to audio media.

transcript str | None

Transcript of audio.

sample_rate int | None

Sample rate in Hz.

channels int | None

Number of audio channels.

Source code in pyagenity/utils/message.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class AudioBlock(BaseModel):
    """
    Audio content block for messages.

    Attributes:
        type (Literal["audio"]): Block type discriminator.
        media (MediaRef): Reference to audio media.
        transcript (str | None): Transcript of audio.
        sample_rate (int | None): Sample rate in Hz.
        channels (int | None): Number of audio channels.
    """

    type: Literal["audio"] = "audio"
    media: MediaRef
    transcript: str | None = None
    sample_rate: int | None = None
    channels: int | None = None
Attributes
channels class-attribute instance-attribute
channels = None
media instance-attribute
media
sample_rate class-attribute instance-attribute
sample_rate = None
transcript class-attribute instance-attribute
transcript = None
type class-attribute instance-attribute
type = 'audio'
DataBlock

Bases: BaseModel

Data content block for messages.

Attributes:

Name Type Description
type Literal['data']

Block type discriminator.

mime_type str

MIME type of the data.

data_base64 str | None

Base64-encoded data.

media MediaRef | None

Reference to associated media.

Source code in pyagenity/utils/message.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class DataBlock(BaseModel):
    """
    Data content block for messages.

    Attributes:
        type (Literal["data"]): Block type discriminator.
        mime_type (str): MIME type of the data.
        data_base64 (str | None): Base64-encoded data.
        media (MediaRef | None): Reference to associated media.
    """

    type: Literal["data"] = "data"
    mime_type: str
    data_base64: str | None = None
    media: MediaRef | None = None
Attributes
data_base64 class-attribute instance-attribute
data_base64 = None
media class-attribute instance-attribute
media = None
mime_type instance-attribute
mime_type
type class-attribute instance-attribute
type = 'data'
DocumentBlock

Bases: BaseModel

Document content block for messages.

Attributes:

Name Type Description
type Literal['document']

Block type discriminator.

media MediaRef

Reference to document media.

pages list[int] | None

List of page numbers.

excerpt str | None

Excerpt from the document.

Source code in pyagenity/utils/message.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class DocumentBlock(BaseModel):
    """
    Document content block for messages.

    Attributes:
        type (Literal["document"]): Block type discriminator.
        media (MediaRef): Reference to document media.
        pages (list[int] | None): List of page numbers.
        excerpt (str | None): Excerpt from the document.
    """

    type: Literal["document"] = "document"
    media: MediaRef
    pages: list[int] | None = None
    excerpt: str | None = None
Attributes
excerpt class-attribute instance-attribute
excerpt = None
media instance-attribute
media
pages class-attribute instance-attribute
pages = None
type class-attribute instance-attribute
type = 'document'
ErrorBlock

Bases: BaseModel

Error content block for messages.

Attributes:

Name Type Description
type Literal['error']

Block type discriminator.

message str

Error message.

code str | None

Error code.

data dict[str, Any] | None

Additional error data.

Source code in pyagenity/utils/message.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class ErrorBlock(BaseModel):
    """
    Error content block for messages.

    Attributes:
        type (Literal["error"]): Block type discriminator.
        message (str): Error message.
        code (str | None): Error code.
        data (dict[str, Any] | None): Additional error data.
    """

    type: Literal["error"] = "error"
    message: str
    code: str | None = None
    data: dict[str, Any] | None = None
Attributes
code class-attribute instance-attribute
code = None
data class-attribute instance-attribute
data = None
message instance-attribute
message
type class-attribute instance-attribute
type = 'error'
ImageBlock

Bases: BaseModel

Image content block for messages.

Attributes:

Name Type Description
type Literal['image']

Block type discriminator.

media MediaRef

Reference to image media.

alt_text str | None

Alternative text for accessibility.

bbox list[float] | None

Bounding box coordinates [x1, y1, x2, y2].

Source code in pyagenity/utils/message.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class ImageBlock(BaseModel):
    """
    Image content block for messages.

    Attributes:
        type (Literal["image"]): Block type discriminator.
        media (MediaRef): Reference to image media.
        alt_text (str | None): Alternative text for accessibility.
        bbox (list[float] | None): Bounding box coordinates [x1, y1, x2, y2].
    """

    type: Literal["image"] = "image"
    media: MediaRef
    alt_text: str | None = None
    bbox: list[float] | None = None  # [x1,y1,x2,y2] if applicable
Attributes
alt_text class-attribute instance-attribute
alt_text = None
bbox class-attribute instance-attribute
bbox = None
media instance-attribute
media
type class-attribute instance-attribute
type = 'image'
MediaRef

Bases: BaseModel

Reference to media content (image/audio/video/document/data).

Prefer referencing by URL or provider file_id over inlining base64 for large payloads.

Attributes:

Name Type Description
kind Literal['url', 'file_id', 'data']

Type of reference.

url str | None

URL to media content.

file_id str | None

Provider-managed file ID.

data_base64 str | None

Base64-encoded data (small payloads only).

mime_type str | None

MIME type of the media.

size_bytes int | None

Size in bytes.

sha256 str | None

SHA256 hash of the media.

filename str | None

Filename of the media.

width int | None

Image width (if applicable).

height int | None

Image height (if applicable).

duration_ms int | None

Duration in milliseconds (if applicable).

page int | None

Page number (if applicable).

Source code in pyagenity/utils/message.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class MediaRef(BaseModel):
    """
    Reference to media content (image/audio/video/document/data).

    Prefer referencing by URL or provider file_id over inlining base64 for large payloads.

    Attributes:
        kind (Literal["url", "file_id", "data"]): Type of reference.
        url (str | None): URL to media content.
        file_id (str | None): Provider-managed file ID.
        data_base64 (str | None): Base64-encoded data (small payloads only).
        mime_type (str | None): MIME type of the media.
        size_bytes (int | None): Size in bytes.
        sha256 (str | None): SHA256 hash of the media.
        filename (str | None): Filename of the media.
        width (int | None): Image width (if applicable).
        height (int | None): Image height (if applicable).
        duration_ms (int | None): Duration in milliseconds (if applicable).
        page (int | None): Page number (if applicable).
    """

    kind: Literal["url", "file_id", "data"] = "url"
    url: str | None = None  # http(s) or data: URL
    file_id: str | None = None  # provider-managed ID (e.g., OpenAI/Gemini)
    data_base64: str | None = None  # small payloads only
    mime_type: str | None = None
    size_bytes: int | None = None
    sha256: str | None = None
    filename: str | None = None
    # Media-specific hints
    width: int | None = None
    height: int | None = None
    duration_ms: int | None = None
    page: int | None = None
Attributes
data_base64 class-attribute instance-attribute
data_base64 = None
duration_ms class-attribute instance-attribute
duration_ms = None
file_id class-attribute instance-attribute
file_id = None
filename class-attribute instance-attribute
filename = None
height class-attribute instance-attribute
height = None
kind class-attribute instance-attribute
kind = 'url'
mime_type class-attribute instance-attribute
mime_type = None
page class-attribute instance-attribute
page = None
sha256 class-attribute instance-attribute
sha256 = None
size_bytes class-attribute instance-attribute
size_bytes = None
url class-attribute instance-attribute
url = None
width class-attribute instance-attribute
width = None
Message

Bases: BaseModel

Represents a message in a conversation, including content, role, metadata, and token usage.

Attributes:

Name Type Description
message_id str | int

Unique identifier for the message.

role Literal['user', 'assistant', 'system', 'tool']

The role of the message sender.

content list[ContentBlock]

The message content blocks.

delta bool

Indicates if this is a delta/partial message.

tools_calls list[dict[str, Any]] | None

Tool call information, if any.

reasoning str | None

Reasoning or explanation, if any.

timestamp datetime | None

Timestamp of the message.

metadata dict[str, Any]

Additional metadata.

usages TokenUsages | None

Token usage statistics.

raw dict[str, Any] | None

Raw data, if any.

Example

msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])

Methods:

Name Description
attach_media

Append a media block to the content.

text

Best-effort text extraction from content blocks.

text_message

Create a Message instance from plain text.

tool_message

Create a tool message, optionally marking it as an error.

Source code in pyagenity/utils/message.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
class Message(BaseModel):
    """
    Represents a message in a conversation, including content, role, metadata, and token usage.

    Attributes:
        message_id (str | int): Unique identifier for the message.
        role (Literal["user", "assistant", "system", "tool"]): The role of the message sender.
        content (list[ContentBlock]): The message content blocks.
        delta (bool): Indicates if this is a delta/partial message.
        tools_calls (list[dict[str, Any]] | None): Tool call information, if any.
        reasoning (str | None): Reasoning or explanation, if any.
        timestamp (datetime | None): Timestamp of the message.
        metadata (dict[str, Any]): Additional metadata.
        usages (TokenUsages | None): Token usage statistics.
        raw (dict[str, Any] | None): Raw data, if any.

    Example:
        >>> msg = Message(message_id="abc123", role="user", content=[TextBlock(text="Hello!")])
        {'message_id': 'abc123', 'role': 'user', 'content': [...], ...}
    """

    message_id: str | int = Field(default_factory=lambda: generate_id(None))
    role: Literal["user", "assistant", "system", "tool"]
    content: list[ContentBlock]
    delta: bool = False  # Indicates if this is a delta/partial message
    tools_calls: list[dict[str, Any]] | None = None
    reasoning: str | None = None  # Remove it
    timestamp: datetime | None = Field(default_factory=datetime.now)
    metadata: dict[str, Any] = Field(default_factory=dict)
    usages: TokenUsages | None = None
    raw: dict[str, Any] | None = None

    @classmethod
    def text_message(
        cls,
        content: str,
        role: Literal["user", "assistant", "system", "tool"] = "user",
        message_id: str | None = None,
    ) -> "Message":
        """
        Create a Message instance from plain text.

        Args:
            content (str): The message content.
            role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
            message_id (str | None): Optional message ID.

        Returns:
            Message: The created Message instance.

        Example:
            >>> Message.text_message("Hello!", role="user")
        """
        logger.debug("Creating message from text with role: %s", role)
        return cls(
            message_id=generate_id(message_id),
            role=role,
            content=[TextBlock(text=content)],
            timestamp=datetime.now(),
            metadata={},
        )

    @classmethod
    def tool_message(
        cls,
        content: list[ContentBlock],
        message_id: str | None = None,
        meta: dict[str, Any] | None = None,
    ) -> "Message":
        """
        Create a tool message, optionally marking it as an error.

        Args:
            content (list[ContentBlock]): The message content blocks.
            message_id (str | None): Optional message ID.
            meta (dict[str, Any] | None): Optional metadata.

        Returns:
            Message: The created tool message instance.

        Example:
            >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
        """
        res = content
        msg_id = generate_id(message_id)
        return cls(
            message_id=msg_id,
            role="tool",
            content=res,
            timestamp=datetime.now(),
            metadata=meta or {},
        )

    # --- Convenience helpers ---
    def text(self) -> str:
        """
        Best-effort text extraction from content blocks.

        Returns:
            str: Concatenated text from TextBlock and ToolResultBlock outputs.

        Example:
            >>> msg.text()
            'Hello!Result text.'
        """
        parts: list[str] = []
        for block in self.content:
            if isinstance(block, TextBlock):
                parts.append(block.text)
            elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
                parts.append(block.output)
        return "".join(parts)

    def attach_media(
        self,
        media: MediaRef,
        as_type: Literal["image", "audio", "video", "document"],
    ) -> None:
        """
        Append a media block to the content.

        If content was text, creates a block list. Supports image, audio, video, and document types.

        Args:
            media (MediaRef): Reference to media content.
            as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

        Returns:
            None

        Raises:
            ValueError: If an unsupported media type is provided.

        Example:
            >>> msg.attach_media(media_ref, as_type="image")
        """
        block: ContentBlock
        if as_type == "image":
            block = ImageBlock(media=media)
        elif as_type == "audio":
            block = AudioBlock(media=media)
        elif as_type == "video":
            block = VideoBlock(media=media)
        elif as_type == "document":
            block = DocumentBlock(media=media)
        else:
            raise ValueError(f"Unsupported media type: {as_type}")

        if isinstance(self.content, str):
            self.content = [TextBlock(text=self.content), block]
        elif isinstance(self.content, list):
            self.content.append(block)
        else:
            self.content = [block]
Attributes
content instance-attribute
content
delta class-attribute instance-attribute
delta = False
message_id class-attribute instance-attribute
message_id = Field(default_factory=lambda: generate_id(None))
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
raw class-attribute instance-attribute
raw = None
reasoning class-attribute instance-attribute
reasoning = None
role instance-attribute
role
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=now)
tools_calls class-attribute instance-attribute
tools_calls = None
usages class-attribute instance-attribute
usages = None
Functions
attach_media
attach_media(media, as_type)

Append a media block to the content.

If content was text, creates a block list. Supports image, audio, video, and document types.

Parameters:

Name Type Description Default
media MediaRef

Reference to media content.

required
as_type Literal['image', 'audio', 'video', 'document']

Type of media block to append.

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

If an unsupported media type is provided.

Example

msg.attach_media(media_ref, as_type="image")

Source code in pyagenity/utils/message.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def attach_media(
    self,
    media: MediaRef,
    as_type: Literal["image", "audio", "video", "document"],
) -> None:
    """
    Append a media block to the content.

    If content was text, creates a block list. Supports image, audio, video, and document types.

    Args:
        media (MediaRef): Reference to media content.
        as_type (Literal["image", "audio", "video", "document"]): Type of media block to append.

    Returns:
        None

    Raises:
        ValueError: If an unsupported media type is provided.

    Example:
        >>> msg.attach_media(media_ref, as_type="image")
    """
    block: ContentBlock
    if as_type == "image":
        block = ImageBlock(media=media)
    elif as_type == "audio":
        block = AudioBlock(media=media)
    elif as_type == "video":
        block = VideoBlock(media=media)
    elif as_type == "document":
        block = DocumentBlock(media=media)
    else:
        raise ValueError(f"Unsupported media type: {as_type}")

    if isinstance(self.content, str):
        self.content = [TextBlock(text=self.content), block]
    elif isinstance(self.content, list):
        self.content.append(block)
    else:
        self.content = [block]
text
text()

Best-effort text extraction from content blocks.

Returns:

Name Type Description
str str

Concatenated text from TextBlock and ToolResultBlock outputs.

Example

msg.text() 'Hello!Result text.'

Source code in pyagenity/utils/message.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def text(self) -> str:
    """
    Best-effort text extraction from content blocks.

    Returns:
        str: Concatenated text from TextBlock and ToolResultBlock outputs.

    Example:
        >>> msg.text()
        'Hello!Result text.'
    """
    parts: list[str] = []
    for block in self.content:
        if isinstance(block, TextBlock):
            parts.append(block.text)
        elif isinstance(block, ToolResultBlock) and isinstance(block.output, str):
            parts.append(block.output)
    return "".join(parts)
text_message classmethod
text_message(content, role='user', message_id=None)

Create a Message instance from plain text.

Parameters:

Name Type Description Default
content str

The message content.

required
role Literal['user', 'assistant', 'system', 'tool']

The role of the sender.

'user'
message_id str | None

Optional message ID.

None

Returns:

Name Type Description
Message Message

The created Message instance.

Example

Message.text_message("Hello!", role="user")

Source code in pyagenity/utils/message.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
@classmethod
def text_message(
    cls,
    content: str,
    role: Literal["user", "assistant", "system", "tool"] = "user",
    message_id: str | None = None,
) -> "Message":
    """
    Create a Message instance from plain text.

    Args:
        content (str): The message content.
        role (Literal["user", "assistant", "system", "tool"]): The role of the sender.
        message_id (str | None): Optional message ID.

    Returns:
        Message: The created Message instance.

    Example:
        >>> Message.text_message("Hello!", role="user")
    """
    logger.debug("Creating message from text with role: %s", role)
    return cls(
        message_id=generate_id(message_id),
        role=role,
        content=[TextBlock(text=content)],
        timestamp=datetime.now(),
        metadata={},
    )
tool_message classmethod
tool_message(content, message_id=None, meta=None)

Create a tool message, optionally marking it as an error.

Parameters:

Name Type Description Default
content list[ContentBlock]

The message content blocks.

required
message_id str | None

Optional message ID.

None
meta dict[str, Any] | None

Optional metadata.

None

Returns:

Name Type Description
Message Message

The created tool message instance.

Example

Message.tool_message([ToolResultBlock(...)], message_id="tool1")

Source code in pyagenity/utils/message.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
@classmethod
def tool_message(
    cls,
    content: list[ContentBlock],
    message_id: str | None = None,
    meta: dict[str, Any] | None = None,
) -> "Message":
    """
    Create a tool message, optionally marking it as an error.

    Args:
        content (list[ContentBlock]): The message content blocks.
        message_id (str | None): Optional message ID.
        meta (dict[str, Any] | None): Optional metadata.

    Returns:
        Message: The created tool message instance.

    Example:
        >>> Message.tool_message([ToolResultBlock(...)], message_id="tool1")
    """
    res = content
    msg_id = generate_id(message_id)
    return cls(
        message_id=msg_id,
        role="tool",
        content=res,
        timestamp=datetime.now(),
        metadata=meta or {},
    )
ReasoningBlock

Bases: BaseModel

Reasoning content block for messages.

Attributes:

Name Type Description
type Literal['reasoning']

Block type discriminator.

summary str

Summary of reasoning.

details list[str] | None

Detailed reasoning steps.

Source code in pyagenity/utils/message.py
314
315
316
317
318
319
320
321
322
323
324
325
326
class ReasoningBlock(BaseModel):
    """
    Reasoning content block for messages.

    Attributes:
        type (Literal["reasoning"]): Block type discriminator.
        summary (str): Summary of reasoning.
        details (list[str] | None): Detailed reasoning steps.
    """

    type: Literal["reasoning"] = "reasoning"
    summary: str
    details: list[str] | None = None
Attributes
details class-attribute instance-attribute
details = None
summary instance-attribute
summary
type class-attribute instance-attribute
type = 'reasoning'
TextBlock

Bases: BaseModel

Text content block for messages.

Attributes:

Name Type Description
type Literal['text']

Block type discriminator.

text str

Text content.

annotations list[AnnotationRef]

List of annotation references.

Source code in pyagenity/utils/message.py
176
177
178
179
180
181
182
183
184
185
186
187
188
class TextBlock(BaseModel):
    """
    Text content block for messages.

    Attributes:
        type (Literal["text"]): Block type discriminator.
        text (str): Text content.
        annotations (list[AnnotationRef]): List of annotation references.
    """

    type: Literal["text"] = "text"
    text: str
    annotations: list[AnnotationRef] = Field(default_factory=list)
Attributes
annotations class-attribute instance-attribute
annotations = Field(default_factory=list)
text instance-attribute
text
type class-attribute instance-attribute
type = 'text'
TokenUsages

Bases: BaseModel

Tracks token usage statistics for a message or model response.

Attributes:

Name Type Description
completion_tokens int

Number of completion tokens used.

prompt_tokens int

Number of prompt tokens used.

total_tokens int

Total tokens used.

reasoning_tokens int

Reasoning tokens used (optional).

cache_creation_input_tokens int

Cache creation input tokens (optional).

cache_read_input_tokens int

Cache read input tokens (optional).

image_tokens int | None

Image tokens for multimodal models (optional).

audio_tokens int | None

Audio tokens for multimodal models (optional).

Example

usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)

Source code in pyagenity/utils/message.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class TokenUsages(BaseModel):
    """
    Tracks token usage statistics for a message or model response.

    Attributes:
        completion_tokens (int): Number of completion tokens used.
        prompt_tokens (int): Number of prompt tokens used.
        total_tokens (int): Total tokens used.
        reasoning_tokens (int): Reasoning tokens used (optional).
        cache_creation_input_tokens (int): Cache creation input tokens (optional).
        cache_read_input_tokens (int): Cache read input tokens (optional).
        image_tokens (int | None): Image tokens for multimodal models (optional).
        audio_tokens (int | None): Audio tokens for multimodal models (optional).

    Example:
        >>> usage = TokenUsages(completion_tokens=10, prompt_tokens=20, total_tokens=30)
        {'completion_tokens': 10, 'prompt_tokens': 20, 'total_tokens': 30, ...}
    """

    completion_tokens: int
    prompt_tokens: int
    total_tokens: int
    reasoning_tokens: int = 0
    cache_creation_input_tokens: int = 0
    cache_read_input_tokens: int = 0
    # Optional modality-specific usage fields for multimodal models
    image_tokens: int | None = 0
    audio_tokens: int | None = 0
Attributes
audio_tokens class-attribute instance-attribute
audio_tokens = 0
cache_creation_input_tokens class-attribute instance-attribute
cache_creation_input_tokens = 0
cache_read_input_tokens class-attribute instance-attribute
cache_read_input_tokens = 0
completion_tokens instance-attribute
completion_tokens
image_tokens class-attribute instance-attribute
image_tokens = 0
prompt_tokens instance-attribute
prompt_tokens
reasoning_tokens class-attribute instance-attribute
reasoning_tokens = 0
total_tokens instance-attribute
total_tokens
ToolCallBlock

Bases: BaseModel

Tool call content block for messages.

Attributes:

Name Type Description
type Literal['tool_call']

Block type discriminator.

id str

Tool call ID.

name str

Tool name.

args dict[str, Any]

Arguments for the tool call.

tool_type str | None

Type of tool (e.g., web_search, file_search).

Source code in pyagenity/utils/message.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class ToolCallBlock(BaseModel):
    """
    Tool call content block for messages.

    Attributes:
        type (Literal["tool_call"]): Block type discriminator.
        id (str): Tool call ID.
        name (str): Tool name.
        args (dict[str, Any]): Arguments for the tool call.
        tool_type (str | None): Type of tool (e.g., web_search, file_search).
    """

    type: Literal["tool_call"] = "tool_call"
    id: str
    name: str
    args: dict[str, Any] = Field(default_factory=dict)
    tool_type: str | None = None  # e.g., web_search, file_search, computer_use
Attributes
args class-attribute instance-attribute
args = Field(default_factory=dict)
id instance-attribute
id
name instance-attribute
name
tool_type class-attribute instance-attribute
tool_type = None
type class-attribute instance-attribute
type = 'tool_call'
ToolResultBlock

Bases: BaseModel

Tool result content block for messages.

Attributes:

Name Type Description
type Literal['tool_result']

Block type discriminator.

call_id str

Tool call ID.

output Any

Output from the tool (str, dict, MediaRef, or list of blocks).

is_error bool

Whether the result is an error.

status Literal['completed', 'failed'] | None

Status of the tool call.

Source code in pyagenity/utils/message.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class ToolResultBlock(BaseModel):
    """
    Tool result content block for messages.

    Attributes:
        type (Literal["tool_result"]): Block type discriminator.
        call_id (str): Tool call ID.
        output (Any): Output from the tool (str, dict, MediaRef, or list of blocks).
        is_error (bool): Whether the result is an error.
        status (Literal["completed", "failed"] | None): Status of the tool call.
    """

    type: Literal["tool_result"] = "tool_result"
    call_id: str
    output: Any = None  # str | dict | MediaRef | list[ContentBlock-like]
    is_error: bool = False
    status: Literal["completed", "failed"] | None = None
Attributes
call_id instance-attribute
call_id
is_error class-attribute instance-attribute
is_error = False
output class-attribute instance-attribute
output = None
status class-attribute instance-attribute
status = None
type class-attribute instance-attribute
type = 'tool_result'
VideoBlock

Bases: BaseModel

Video content block for messages.

Attributes:

Name Type Description
type Literal['video']

Block type discriminator.

media MediaRef

Reference to video media.

thumbnail MediaRef | None

Reference to thumbnail image.

Source code in pyagenity/utils/message.py
227
228
229
230
231
232
233
234
235
236
237
238
239
class VideoBlock(BaseModel):
    """
    Video content block for messages.

    Attributes:
        type (Literal["video"]): Block type discriminator.
        media (MediaRef): Reference to video media.
        thumbnail (MediaRef | None): Reference to thumbnail image.
    """

    type: Literal["video"] = "video"
    media: MediaRef
    thumbnail: MediaRef | None = None
Attributes
media instance-attribute
media
thumbnail class-attribute instance-attribute
thumbnail = None
type class-attribute instance-attribute
type = 'video'

Functions

generate_id
generate_id(default_id)

Generate a message or tool call ID based on DI context and type.

Parameters:

Name Type Description Default
default_id
str | int | None

Default ID to use if provided and matches type.

required

Returns:

Type Description
str | int

str | int: Generated or provided ID, type determined by DI context.

Example

generate_id("abc123") 'abc123' generate_id(None) 'a-uuid-string'

Source code in pyagenity/utils/message.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def generate_id(default_id: str | int | None) -> str | int:
    """
    Generate a message or tool call ID based on DI context and type.

    Args:
        default_id (str | int | None): Default ID to use if provided and matches type.

    Returns:
        str | int: Generated or provided ID, type determined by DI context.

    Raises:
        None

    Example:
        >>> generate_id("abc123")
        'abc123'
        >>> generate_id(None)
        'a-uuid-string'
    """
    id_type = InjectQ.get_instance().try_get("generated_id_type", "string")
    generated_id = InjectQ.get_instance().try_get("generated_id", None)

    # if user provided an awaitable, resolve it
    if isinstance(generated_id, Awaitable):

        async def wait_for_id():
            return await generated_id

        generated_id = asyncio.run(wait_for_id())

    if generated_id:
        return generated_id

    if default_id:
        if id_type == "string" and isinstance(default_id, str):
            return default_id
        if id_type in ("int", "bigint") and isinstance(default_id, int):
            return default_id

    # if not matched or default_id is None, generate new id
    logger.debug(
        "Generating new id of type: %s. Default ID not provided or not matched %s",
        id_type,
        default_id,
    )

    if id_type == "int":
        return uuid4().int >> 32
    if id_type == "bigint":
        return uuid4().int >> 64
    return str(uuid4())

metrics

Lightweight metrics instrumentation utilities.

Design goals
  • Zero dependency by default.
  • Cheap no-op when disabled.
  • Pluggable exporter (e.g., Prometheus scrape formatting) later.
Usage

from pyagenity.utils.metrics import counter, timer counter('messages_written_total').inc() with timer('db_write_latency_ms'): ...

Classes:

Name Description
Counter
TimerMetric

Functions:

Name Description
counter
enable_metrics
snapshot

Return a point-in-time snapshot of metrics (thread-safe copy).

timer

Classes

Counter dataclass

Methods:

Name Description
__init__
inc

Attributes:

Name Type Description
name str
value int
Source code in pyagenity/utils/metrics.py
34
35
36
37
38
39
40
41
42
43
@dataclass
class Counter:
    name: str
    value: int = 0

    def inc(self, amount: int = 1) -> None:
        if not _ENABLED:
            return
        with _LOCK:
            self.value += amount
Attributes
name instance-attribute
name
value class-attribute instance-attribute
value = 0
Functions
__init__
__init__(name, value=0)
inc
inc(amount=1)
Source code in pyagenity/utils/metrics.py
39
40
41
42
43
def inc(self, amount: int = 1) -> None:
    if not _ENABLED:
        return
    with _LOCK:
        self.value += amount
TimerMetric dataclass

Methods:

Name Description
__init__
observe

Attributes:

Name Type Description
avg_ms float
count int
max_ms float
name str
total_ms float
Source code in pyagenity/utils/metrics.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class TimerMetric:
    name: str
    count: int = 0
    total_ms: float = 0.0
    max_ms: float = 0.0

    def observe(self, duration_ms: float) -> None:
        if not _ENABLED:
            return
        with _LOCK:
            self.count += 1
            self.total_ms += duration_ms
            self.max_ms = max(self.max_ms, duration_ms)

    @property
    def avg_ms(self) -> float:
        if self.count == 0:
            return 0.0
        return self.total_ms / self.count
Attributes
avg_ms property
avg_ms
count class-attribute instance-attribute
count = 0
max_ms class-attribute instance-attribute
max_ms = 0.0
name instance-attribute
name
total_ms class-attribute instance-attribute
total_ms = 0.0
Functions
__init__
__init__(name, count=0, total_ms=0.0, max_ms=0.0)
observe
observe(duration_ms)
Source code in pyagenity/utils/metrics.py
53
54
55
56
57
58
59
def observe(self, duration_ms: float) -> None:
    if not _ENABLED:
        return
    with _LOCK:
        self.count += 1
        self.total_ms += duration_ms
        self.max_ms = max(self.max_ms, duration_ms)

Functions

counter
counter(name)
Source code in pyagenity/utils/metrics.py
68
69
70
71
72
73
74
def counter(name: str) -> Counter:
    with _LOCK:
        c = _COUNTERS.get(name)
        if c is None:
            c = Counter(name)
            _COUNTERS[name] = c
        return c
enable_metrics
enable_metrics(value)
Source code in pyagenity/utils/metrics.py
29
30
31
def enable_metrics(value: bool) -> None:  # simple toggle; acceptable global
    # Intentionally keeps a module-level switch—call sites cheap check.
    globals()["_ENABLED"] = value
snapshot
snapshot()

Return a point-in-time snapshot of metrics (thread-safe copy).

Source code in pyagenity/utils/metrics.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def snapshot() -> dict:
    """Return a point-in-time snapshot of metrics (thread-safe copy)."""
    with _LOCK:
        return {
            "counters": {k: v.value for k, v in _COUNTERS.items()},
            "timers": {
                k: {
                    "count": t.count,
                    "avg_ms": t.avg_ms,
                    "max_ms": t.max_ms,
                }
                for k, t in _TIMERS.items()
            },
        }
timer
timer(name)
Source code in pyagenity/utils/metrics.py
77
78
79
80
81
82
83
84
85
def timer(name: str) -> _TimerCtx:  # convenience factory
    metric = _TIMERS.get(name)
    if metric is None:
        with _LOCK:
            metric = _TIMERS.get(name)
            if metric is None:
                metric = TimerMetric(name)
                _TIMERS[name] = metric
    return _TimerCtx(metric)

reducers

Reducer utilities for merging and replacing lists and values in agent state.

This module provides generic and message-specific reducers for combining lists, replacing values, and appending items while avoiding duplicates.

Functions:

Name Description
add_messages

Adds messages to a list, avoiding duplicates by message_id.

replace_messages

Replaces the entire message list.

append_items

Appends items to a list, avoiding duplicates by id.

replace_value

Replaces a value with another.

Classes

Functions

add_messages
add_messages(left, right)

Adds messages to the list, avoiding duplicates by message_id.

Parameters:

Name Type Description Default
left
list[Message]

Existing list of messages.

required
right
list[Message]

New messages to add.

required

Returns:

Type Description
list[Message]

list[Message]: Combined list with unique messages.

Example

add_messages([msg1], [msg2, msg1]) [msg1, msg2]

Source code in pyagenity/utils/reducers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def add_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Adds messages to the list, avoiding duplicates by message_id.

    Args:
        left (list[Message]): Existing list of messages.
        right (list[Message]): New messages to add.

    Returns:
        list[Message]: Combined list with unique messages.

    Example:
        >>> add_messages([msg1], [msg2, msg1])
        [msg1, msg2]
    """
    left_ids = {msg.message_id for msg in left}
    right = [msg for msg in right if msg.message_id not in left_ids]
    return left + right
append_items
append_items(left, right)

Appends items to a list, avoiding duplicates by item.id.

Parameters:

Name Type Description Default
left
list

Existing list of items (must have .id attribute).

required
right
list

New items to add.

required

Returns:

Name Type Description
list list

Combined list with unique items.

Example

append_items([item1], [item2, item1]) [item1, item2]

Source code in pyagenity/utils/reducers.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def append_items(left: list, right: list) -> list:
    """
    Appends items to a list, avoiding duplicates by item.id.

    Args:
        left (list): Existing list of items (must have .id attribute).
        right (list): New items to add.

    Returns:
        list: Combined list with unique items.

    Example:
        >>> append_items([item1], [item2, item1])
        [item1, item2]
    """
    left_ids = {item.id for item in left}
    right = [item for item in right if item.id not in left_ids]
    return left + right
replace_messages
replace_messages(left, right)

Replaces the entire message list with a new one.

Parameters:

Name Type Description Default
left
list[Message]

Existing list of messages (ignored).

required
right
list[Message]

New list of messages.

required

Returns:

Type Description
list[Message]

list[Message]: The new message list.

Example

replace_messages([msg1], [msg2]) [msg2]

Source code in pyagenity/utils/reducers.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def replace_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Replaces the entire message list with a new one.

    Args:
        left (list[Message]): Existing list of messages (ignored).
        right (list[Message]): New list of messages.

    Returns:
        list[Message]: The new message list.

    Example:
        >>> replace_messages([msg1], [msg2])
        [msg2]
    """
    return right
replace_value
replace_value(left, right)

Replaces a value with another.

Parameters:

Name Type Description Default
left

Existing value (ignored).

required
right

New value to use.

required

Returns:

Name Type Description
Any

The new value.

Example

replace_value(1, 2) 2

Source code in pyagenity/utils/reducers.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def replace_value(left, right):
    """
    Replaces a value with another.

    Args:
        left: Existing value (ignored).
        right: New value to use.

    Returns:
        Any: The new value.

    Example:
        >>> replace_value(1, 2)
        2
    """
    return right

thread_info

Thread metadata and status tracking for agent graphs.

This module defines the ThreadInfo model, which tracks thread identity, user, metadata, status, and timestamps for agent graph execution and orchestration.

Classes:

Name Description
ThreadInfo

Metadata and status for a thread in agent execution.

Classes

ThreadInfo

Bases: BaseModel

Metadata and status for a thread in agent execution.

Attributes:

Name Type Description
thread_id int | str

Unique identifier for the thread.

thread_name str | None

Optional name for the thread.

user_id int | str | None

Optional user identifier associated with the thread.

metadata dict[str, Any] | None

Optional metadata for the thread.

updated_at datetime | None

Timestamp of last update.

stop_requested bool

Whether a stop has been requested for the thread.

run_id str | None

Optional run identifier for the thread execution.

Example

ThreadInfo(thread_id=1, thread_name="main", user_id=42)

Source code in pyagenity/utils/thread_info.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ThreadInfo(BaseModel):
    """
    Metadata and status for a thread in agent execution.

    Attributes:
        thread_id (int | str): Unique identifier for the thread.
        thread_name (str | None): Optional name for the thread.
        user_id (int | str | None): Optional user identifier associated with the thread.
        metadata (dict[str, Any] | None): Optional metadata for the thread.
        updated_at (datetime | None): Timestamp of last update.
        stop_requested (bool): Whether a stop has been requested for the thread.
        run_id (str | None): Optional run identifier for the thread execution.

    Example:
        >>> ThreadInfo(thread_id=1, thread_name="main", user_id=42)
    """

    thread_id: int | str
    thread_name: str | None = None
    user_id: int | str | None = None
    metadata: dict[str, Any] | None = None
    updated_at: datetime | None = None
    run_id: str | None = None
Attributes
metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
thread_id instance-attribute
thread_id
thread_name class-attribute instance-attribute
thread_name = None
updated_at class-attribute instance-attribute
updated_at = None
user_id class-attribute instance-attribute
user_id = None

thread_name_generator

Thread name generation utilities for AI agent conversations.

This module provides the AIThreadNameGenerator class and helper function for generating meaningful, varied, and human-friendly thread names for AI chat sessions using different patterns and themes.

Classes:

Name Description
AIThreadNameGenerator

Generates thread names using adjective-noun, action-based,

Functions:

Name Description
generate_dummy_thread_name

Convenience function for generating a thread name.

Classes

AIThreadNameGenerator

Generates meaningful, varied thread names for AI conversations using different patterns and themes. Patterns include adjective-noun, action-based, and compound descriptive names.

Example

AIThreadNameGenerator().generate_name() 'thoughtful-dialogue'

Methods:

Name Description
generate_action_name

Generate an action-based thread name for a more dynamic feel.

generate_compound_name

Generate a compound descriptive thread name.

generate_name

Generate a meaningful thread name using random pattern selection.

generate_simple_name

Generate a simple adjective-noun combination for a thread name.

Attributes:

Name Type Description
ACTION_PATTERNS
ADJECTIVES
COMPOUND_PATTERNS
NOUNS
Source code in pyagenity/utils/thread_name_generator.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class AIThreadNameGenerator:
    """
    Generates meaningful, varied thread names for AI conversations using different
    patterns and themes. Patterns include adjective-noun, action-based, and compound
    descriptive names.

    Example:
        >>> AIThreadNameGenerator().generate_name()
        'thoughtful-dialogue'
    """

    # Enhanced adjectives grouped by semantic meaning
    ADJECTIVES = [
        # Intellectual
        "thoughtful",
        "insightful",
        "analytical",
        "logical",
        "strategic",
        "methodical",
        "systematic",
        "comprehensive",
        "detailed",
        "precise",
        # Creative
        "creative",
        "imaginative",
        "innovative",
        "artistic",
        "expressive",
        "original",
        "inventive",
        "inspired",
        "visionary",
        "whimsical",
        # Emotional/Social
        "engaging",
        "collaborative",
        "meaningful",
        "productive",
        "harmonious",
        "enlightening",
        "empathetic",
        "supportive",
        "encouraging",
        "uplifting",
        # Dynamic
        "dynamic",
        "energetic",
        "vibrant",
        "lively",
        "spirited",
        "active",
        "flowing",
        "adaptive",
        "responsive",
        "interactive",
        # Quality-focused
        "focused",
        "dedicated",
        "thorough",
        "meticulous",
        "careful",
        "patient",
        "persistent",
        "resilient",
        "determined",
        "ambitious",
    ]

    # Enhanced nouns with more conversational context
    NOUNS = [
        # Conversation-related
        "dialogue",
        "conversation",
        "discussion",
        "exchange",
        "chat",
        "consultation",
        "session",
        "meeting",
        "interaction",
        "communication",
        # Journey/Process
        "journey",
        "exploration",
        "adventure",
        "quest",
        "voyage",
        "expedition",
        "discovery",
        "investigation",
        "research",
        "study",
        # Conceptual
        "insight",
        "vision",
        "perspective",
        "understanding",
        "wisdom",
        "knowledge",
        "learning",
        "growth",
        "development",
        "progress",
        # Solution-oriented
        "solution",
        "approach",
        "strategy",
        "method",
        "framework",
        "plan",
        "blueprint",
        "pathway",
        "route",
        "direction",
        # Creative/Abstract
        "canvas",
        "story",
        "narrative",
        "symphony",
        "composition",
        "creation",
        "masterpiece",
        "design",
        "pattern",
        "concept",
        # Collaborative
        "partnership",
        "collaboration",
        "alliance",
        "connection",
        "bond",
        "synergy",
        "harmony",
        "unity",
        "cooperation",
        "teamwork",
    ]

    # Action-based patterns for more dynamic names
    ACTION_PATTERNS = {
        "exploring": ["ideas", "concepts", "possibilities", "mysteries", "frontiers", "depths"],
        "building": ["solutions", "understanding", "connections", "frameworks", "bridges"],
        "discovering": ["insights", "patterns", "answers", "truths", "secrets", "wisdom"],
        "crafting": ["responses", "solutions", "stories", "strategies", "experiences"],
        "navigating": ["challenges", "questions", "complexities", "territories", "paths"],
        "unlocking": ["potential", "mysteries", "possibilities", "creativity", "knowledge"],
        "weaving": ["ideas", "stories", "connections", "patterns", "narratives"],
        "illuminating": ["concepts", "mysteries", "paths", "truths", "possibilities"],
    }

    # Descriptive compound patterns
    COMPOUND_PATTERNS = [
        ("deep", ["dive", "thought", "reflection", "analysis", "exploration"]),
        ("bright", ["spark", "idea", "insight", "moment", "flash"]),
        ("fresh", ["perspective", "approach", "start", "take", "view"]),
        ("open", ["dialogue", "discussion", "conversation", "exchange", "forum"]),
        ("creative", ["flow", "spark", "burst", "stream", "wave"]),
        ("mindful", ["moment", "pause", "reflection", "consideration", "thought"]),
        ("collaborative", ["effort", "venture", "journey", "exploration", "creation"]),
    ]

    def generate_simple_name(self, separator: str = "-") -> str:
        """
        Generate a simple adjective-noun combination for a thread name.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: Name like "thoughtful-dialogue" or "creative-exploration".

        Example:
            >>> AIThreadNameGenerator().generate_simple_name()
            'creative-exploration'
        """
        adj = secrets.choice(self.ADJECTIVES)
        noun = secrets.choice(self.NOUNS)
        return f"{adj}{separator}{noun}"

    def generate_action_name(self, separator: str = "-") -> str:
        """
        Generate an action-based thread name for a more dynamic feel.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: Name like "exploring-ideas" or "building-understanding".

        Example:
            >>> AIThreadNameGenerator().generate_action_name()
            'building-connections'
        """
        action = secrets.choice(list(self.ACTION_PATTERNS.keys()))
        target = secrets.choice(self.ACTION_PATTERNS[action])
        return f"{action}{separator}{target}"

    def generate_compound_name(self, separator: str = "-") -> str:
        """
        Generate a compound descriptive thread name.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: Name like "deep-dive" or "bright-spark".

        Example:
            >>> AIThreadNameGenerator().generate_compound_name()
            'deep-reflection'
        """
        base, options = secrets.choice(self.COMPOUND_PATTERNS)
        complement = secrets.choice(options)
        return f"{base}{separator}{complement}"

    def generate_name(self, separator: str = "-") -> str:
        """
        Generate a meaningful thread name using random pattern selection.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: A meaningful thread name from various patterns.

        Example:
            >>> AIThreadNameGenerator().generate_name()
            'engaging-discussion'
        """
        # Randomly choose between different naming patterns
        pattern = secrets.choice(["simple", "action", "compound"])

        if pattern == "simple":
            return self.generate_simple_name(separator)
        if pattern == "action":
            return self.generate_action_name(separator)
        # compound
        return self.generate_compound_name(separator)
Attributes
ACTION_PATTERNS class-attribute instance-attribute
ACTION_PATTERNS = {'exploring': ['ideas', 'concepts', 'possibilities', 'mysteries', 'frontiers', 'depths'], 'building': ['solutions', 'understanding', 'connections', 'frameworks', 'bridges'], 'discovering': ['insights', 'patterns', 'answers', 'truths', 'secrets', 'wisdom'], 'crafting': ['responses', 'solutions', 'stories', 'strategies', 'experiences'], 'navigating': ['challenges', 'questions', 'complexities', 'territories', 'paths'], 'unlocking': ['potential', 'mysteries', 'possibilities', 'creativity', 'knowledge'], 'weaving': ['ideas', 'stories', 'connections', 'patterns', 'narratives'], 'illuminating': ['concepts', 'mysteries', 'paths', 'truths', 'possibilities']}
ADJECTIVES class-attribute instance-attribute
ADJECTIVES = ['thoughtful', 'insightful', 'analytical', 'logical', 'strategic', 'methodical', 'systematic', 'comprehensive', 'detailed', 'precise', 'creative', 'imaginative', 'innovative', 'artistic', 'expressive', 'original', 'inventive', 'inspired', 'visionary', 'whimsical', 'engaging', 'collaborative', 'meaningful', 'productive', 'harmonious', 'enlightening', 'empathetic', 'supportive', 'encouraging', 'uplifting', 'dynamic', 'energetic', 'vibrant', 'lively', 'spirited', 'active', 'flowing', 'adaptive', 'responsive', 'interactive', 'focused', 'dedicated', 'thorough', 'meticulous', 'careful', 'patient', 'persistent', 'resilient', 'determined', 'ambitious']
COMPOUND_PATTERNS class-attribute instance-attribute
COMPOUND_PATTERNS = [('deep', ['dive', 'thought', 'reflection', 'analysis', 'exploration']), ('bright', ['spark', 'idea', 'insight', 'moment', 'flash']), ('fresh', ['perspective', 'approach', 'start', 'take', 'view']), ('open', ['dialogue', 'discussion', 'conversation', 'exchange', 'forum']), ('creative', ['flow', 'spark', 'burst', 'stream', 'wave']), ('mindful', ['moment', 'pause', 'reflection', 'consideration', 'thought']), ('collaborative', ['effort', 'venture', 'journey', 'exploration', 'creation'])]
NOUNS class-attribute instance-attribute
NOUNS = ['dialogue', 'conversation', 'discussion', 'exchange', 'chat', 'consultation', 'session', 'meeting', 'interaction', 'communication', 'journey', 'exploration', 'adventure', 'quest', 'voyage', 'expedition', 'discovery', 'investigation', 'research', 'study', 'insight', 'vision', 'perspective', 'understanding', 'wisdom', 'knowledge', 'learning', 'growth', 'development', 'progress', 'solution', 'approach', 'strategy', 'method', 'framework', 'plan', 'blueprint', 'pathway', 'route', 'direction', 'canvas', 'story', 'narrative', 'symphony', 'composition', 'creation', 'masterpiece', 'design', 'pattern', 'concept', 'partnership', 'collaboration', 'alliance', 'connection', 'bond', 'synergy', 'harmony', 'unity', 'cooperation', 'teamwork']
Functions
generate_action_name
generate_action_name(separator='-')

Generate an action-based thread name for a more dynamic feel.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

Name like "exploring-ideas" or "building-understanding".

Example

AIThreadNameGenerator().generate_action_name() 'building-connections'

Source code in pyagenity/utils/thread_name_generator.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def generate_action_name(self, separator: str = "-") -> str:
    """
    Generate an action-based thread name for a more dynamic feel.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: Name like "exploring-ideas" or "building-understanding".

    Example:
        >>> AIThreadNameGenerator().generate_action_name()
        'building-connections'
    """
    action = secrets.choice(list(self.ACTION_PATTERNS.keys()))
    target = secrets.choice(self.ACTION_PATTERNS[action])
    return f"{action}{separator}{target}"
generate_compound_name
generate_compound_name(separator='-')

Generate a compound descriptive thread name.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

Name like "deep-dive" or "bright-spark".

Example

AIThreadNameGenerator().generate_compound_name() 'deep-reflection'

Source code in pyagenity/utils/thread_name_generator.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def generate_compound_name(self, separator: str = "-") -> str:
    """
    Generate a compound descriptive thread name.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: Name like "deep-dive" or "bright-spark".

    Example:
        >>> AIThreadNameGenerator().generate_compound_name()
        'deep-reflection'
    """
    base, options = secrets.choice(self.COMPOUND_PATTERNS)
    complement = secrets.choice(options)
    return f"{base}{separator}{complement}"
generate_name
generate_name(separator='-')

Generate a meaningful thread name using random pattern selection.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

A meaningful thread name from various patterns.

Example

AIThreadNameGenerator().generate_name() 'engaging-discussion'

Source code in pyagenity/utils/thread_name_generator.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def generate_name(self, separator: str = "-") -> str:
    """
    Generate a meaningful thread name using random pattern selection.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: A meaningful thread name from various patterns.

    Example:
        >>> AIThreadNameGenerator().generate_name()
        'engaging-discussion'
    """
    # Randomly choose between different naming patterns
    pattern = secrets.choice(["simple", "action", "compound"])

    if pattern == "simple":
        return self.generate_simple_name(separator)
    if pattern == "action":
        return self.generate_action_name(separator)
    # compound
    return self.generate_compound_name(separator)
generate_simple_name
generate_simple_name(separator='-')

Generate a simple adjective-noun combination for a thread name.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

Name like "thoughtful-dialogue" or "creative-exploration".

Example

AIThreadNameGenerator().generate_simple_name() 'creative-exploration'

Source code in pyagenity/utils/thread_name_generator.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def generate_simple_name(self, separator: str = "-") -> str:
    """
    Generate a simple adjective-noun combination for a thread name.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: Name like "thoughtful-dialogue" or "creative-exploration".

    Example:
        >>> AIThreadNameGenerator().generate_simple_name()
        'creative-exploration'
    """
    adj = secrets.choice(self.ADJECTIVES)
    noun = secrets.choice(self.NOUNS)
    return f"{adj}{separator}{noun}"

Functions

generate_dummy_thread_name
generate_dummy_thread_name(separator='-')

Generate a meaningful English name for an AI chat thread.

Parameters:

Name Type Description Default
separator
str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

A meaningful thread name like 'thoughtful-dialogue', 'exploring-ideas', or 'deep-dive'.

Example

generate_dummy_thread_name() 'creative-exploration'

Source code in pyagenity/utils/thread_name_generator.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def generate_dummy_thread_name(separator: str = "-") -> str:
    """
    Generate a meaningful English name for an AI chat thread.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: A meaningful thread name like 'thoughtful-dialogue', 'exploring-ideas', or 'deep-dive'.

    Example:
        >>> generate_dummy_thread_name()
        'creative-exploration'
    """
    generator = AIThreadNameGenerator()
    return generator.generate_name(separator)